@transformation/core
Advanced tools
Comparing version 4.0.1 to 4.1.0
{ | ||
"name": "@transformation/core", | ||
"version": "4.0.1", | ||
"version": "4.1.0", | ||
"description": "Create transformation pipelines", | ||
@@ -27,3 +27,3 @@ "main": "./src/index.js", | ||
"unexpected": "12.0.3", | ||
"unexpected-steps": "^4.0.1" | ||
"unexpected-steps": "^4.1.0" | ||
}, | ||
@@ -33,3 +33,3 @@ "engines": { | ||
}, | ||
"gitHead": "ecfc77e02c973039eeed3f6d218953ab134d2c03" | ||
"gitHead": "515bd8a714852520556f12b42640257e0463eecf" | ||
} |
@@ -0,6 +1,21 @@ | ||
const { go, put, close, clone, CLOSED, take } = require("medium"); | ||
const channelStep = (body) => ({ | ||
type: "step", | ||
body, | ||
body: (input, errors) => { | ||
const bodyErrors = clone(errors); | ||
const output = body(input, bodyErrors); | ||
go(async () => { | ||
const value = await take(bodyErrors); | ||
await put(errors, value); | ||
if (value !== CLOSED) { | ||
close(input); | ||
} | ||
}); | ||
return output; | ||
}, | ||
}); | ||
module.exports = channelStep; |
@@ -21,3 +21,3 @@ const { go, close, CLOSED, chan, put, take } = require("medium"); | ||
const value = await take(input); | ||
if (value === CLOSED || output.isClosed) break; | ||
if (value === CLOSED) break; | ||
@@ -39,3 +39,2 @@ const chosen = cases[selector(value)]; | ||
} finally { | ||
close(input); | ||
close(output); | ||
@@ -42,0 +41,0 @@ } |
@@ -1,12 +0,37 @@ | ||
const step = require("./step"); | ||
const { go, close, CLOSED, chan, put, take } = require("medium"); | ||
const channelStep = require("./channelStep"); | ||
const emitAll = (...iterables) => | ||
step(async ({ take, put, CLOSED }) => { | ||
for (const iterable of iterables) { | ||
for await (const item of iterable) { | ||
await put(item); | ||
channelStep((input, errors) => { | ||
const output = chan(); | ||
let closing = false; | ||
go(async () => { | ||
while (true) { | ||
const value = await take(input); | ||
if (value === CLOSED) break; | ||
} | ||
} | ||
closing = true; | ||
}); | ||
go(async () => { | ||
try { | ||
for (const iterable of iterables) { | ||
if (closing) break; | ||
for await (const item of iterable) { | ||
if (closing) break; | ||
await put(output, item); | ||
} | ||
} | ||
} catch (err) { | ||
await put(errors, err); | ||
} finally { | ||
close(output); | ||
} | ||
}); | ||
return output; | ||
}); | ||
module.exports = emitAll; |
@@ -1,10 +0,5 @@ | ||
const step = require("./step"); | ||
const emitAll = require("./emitAll"); | ||
const emitItems = (...items) => | ||
step(async ({ take, put, CLOSED }) => { | ||
for (const item of items) { | ||
await put(item); | ||
} | ||
}); | ||
const emitItems = (...items) => emitAll(items); | ||
module.exports = emitItems; |
@@ -1,2 +0,2 @@ | ||
const { go, chan, close, CLOSED, take } = require("medium"); | ||
const { go, chan, close, CLOSED, take, sleep } = require("medium"); | ||
@@ -27,3 +27,11 @@ const flush = async (stepOrChannel) => { | ||
close(errors); | ||
close(input); | ||
try { | ||
const error = await Promise.race([take(errors), sleep(0)]); | ||
if (error) throw error; | ||
} finally { | ||
close(errors); | ||
} | ||
if (error) throw error; | ||
@@ -30,0 +38,0 @@ }; |
@@ -10,7 +10,7 @@ const { go, close, CLOSED, chan, put, take } = require("medium"); | ||
let i = 0; | ||
while (true) { | ||
const value = await take(input); | ||
if (value === CLOSED) break; | ||
try { | ||
while (true) { | ||
const value = await take(input); | ||
if (value === CLOSED) break; | ||
try { | ||
const mappedValue = await mapper(value, i++); | ||
@@ -24,8 +24,11 @@ | ||
const valueInput = chan(); | ||
const valueOutput = mappedValue.body(valueInput, errors); | ||
close(valueInput); | ||
while (true) { | ||
const item = await take(valueOutput); | ||
if (item === CLOSED) break; | ||
await put(output, item); | ||
try { | ||
const valueOutput = mappedValue.body(valueInput, errors); | ||
while (true) { | ||
const item = await take(valueOutput); | ||
if (item === CLOSED) break; | ||
await put(output, item); | ||
} | ||
} finally { | ||
close(valueInput); | ||
} | ||
@@ -35,8 +38,8 @@ } else { | ||
} | ||
} catch (e) { | ||
put(errors, e); | ||
} | ||
} catch (e) { | ||
await put(errors, e); | ||
} finally { | ||
close(output); | ||
} | ||
close(output); | ||
}); | ||
@@ -43,0 +46,0 @@ |
const expect = require("unexpected").clone().use(require("unexpected-steps")); | ||
const emitItems = require("./emitItems"); | ||
const emitRange = require("./emitRange"); | ||
const forEach = require("./forEach"); | ||
const map = require("./map"); | ||
const pipeline = require("./pipeline"); | ||
const emitRange = require("./emitRange"); | ||
const program = require("./program"); | ||
const toArray = require("./toArray"); | ||
const map = require("./map"); | ||
@@ -43,2 +45,51 @@ describe("map", () => { | ||
}); | ||
it("stops proceesing when an error occurs", async () => { | ||
const processed = []; | ||
await expect( | ||
() => | ||
program( | ||
emitItems(0, 1, 2, 3, 4, "bomb", 5, 6, 7, 8, 9), | ||
map((n) => { | ||
if (n === "bomb") { | ||
throw new Error("Boom!"); | ||
} | ||
return n; | ||
}), | ||
forEach((item) => processed.push(item)) | ||
), | ||
"to error", | ||
"Boom!" | ||
); | ||
expect(processed, "to equal", [0, 1, 2, 3, 4]); | ||
}); | ||
it("stops proceesing when an error occurs inside of a sub-pipeline", async () => { | ||
const processed = []; | ||
await expect( | ||
() => | ||
program( | ||
emitItems(0, 1, 2, 3, 4, "bomb", 5, 6, 7, 8, 9), | ||
map((n) => | ||
pipeline( | ||
emitItems(n), | ||
map((n) => { | ||
if (n === "bomb") { | ||
throw new Error("Boom!"); | ||
} | ||
return n; | ||
}) | ||
) | ||
), | ||
forEach((item) => processed.push(item)) | ||
), | ||
"to error", | ||
"Boom!" | ||
); | ||
expect(processed, "to equal", [0, 1, 2, 3, 4, 5]); | ||
}); | ||
}); |
@@ -31,4 +31,6 @@ const { go, take, close, chan, merge, put, CLOSED } = require("medium"); | ||
const parallel = (step, concurrency = 2 * cpus) => | ||
channelStep((input, errors) => { | ||
const parallel = (step, concurrency = 2 * cpus) => { | ||
if (concurrency < 2) return step; | ||
return channelStep((input, errors) => { | ||
const outputs = []; | ||
@@ -42,3 +44,3 @@ const output = chan(concurrency); | ||
const parallelOutput = buffer(concurrency).body(merge(...outputs)); | ||
const parallelOutput = buffer(concurrency).body(merge(...outputs), errors); | ||
@@ -88,3 +90,4 @@ go(async () => { | ||
}); | ||
}; | ||
module.exports = parallel; |
@@ -10,3 +10,3 @@ const { chan, go, take, put, close, CLOSED } = require("medium"); | ||
go(async () => { | ||
let channel = input || chan(); | ||
let channel = input; | ||
@@ -31,6 +31,2 @@ try { | ||
if (!input) { | ||
close(channel); | ||
} | ||
while (true) { | ||
@@ -45,3 +41,2 @@ const value = await take(channel); | ||
} finally { | ||
close(input); | ||
close(output); | ||
@@ -48,0 +43,0 @@ } |
@@ -9,7 +9,3 @@ const { go, close, CLOSED, chan, put, take } = require("medium"); | ||
const takeWrapper = () => take(input); | ||
const putWrapper = async (value) => { | ||
const open = await put(output, value); | ||
if (!open) close(input); | ||
return open; | ||
}; | ||
const putWrapper = (value) => put(output, value); | ||
@@ -20,3 +16,2 @@ go(async () => { | ||
} catch (err) { | ||
close(input); | ||
await put(errors, err); | ||
@@ -23,0 +18,0 @@ } finally { |
@@ -7,4 +7,4 @@ const map = require("./map"); | ||
const withGroup = (...steps) => | ||
map(async (group) => | ||
Group.isGroup(group) | ||
map(async (group) => { | ||
return Group.isGroup(group) | ||
? { | ||
@@ -14,5 +14,5 @@ ...group, | ||
} | ||
: group | ||
); | ||
: group; | ||
}); | ||
module.exports = withGroup; |
@@ -0,8 +1,12 @@ | ||
const Group = require("./Group"); | ||
const emitItems = require("./emitItems"); | ||
const expect = require("unexpected").clone().use(require("unexpected-steps")); | ||
const emitItems = require("./emitItems"); | ||
const extend = require("./extend"); | ||
const forEach = require("./forEach"); | ||
const groupBy = require("./groupBy"); | ||
const map = require("./map"); | ||
const partition = require("./partition"); | ||
const pipeline = require("./pipeline"); | ||
const groupBy = require("./groupBy"); | ||
const extend = require("./extend"); | ||
const program = require("./program"); | ||
const withGroup = require("./withGroup"); | ||
const Group = require("./Group"); | ||
@@ -45,2 +49,30 @@ describe("withGroup", () => { | ||
}); | ||
it("stops processing if an error occurs", async () => { | ||
const processed = []; | ||
await expect( | ||
() => | ||
program( | ||
emitItems(0, 1, 2, 3, 4, "bomb", 5, 6, 7, 8, 9, 10, 11, 12), | ||
partition(2), | ||
withGroup( | ||
map((n) => { | ||
if (n === "bomb") { | ||
throw new Error("Boom!"); | ||
} | ||
return n; | ||
}) | ||
), | ||
forEach((group) => processed.push(group)) | ||
), | ||
"to error", | ||
"Boom!" | ||
); | ||
expect(processed, "to equal", [ | ||
{ key: "[0;1]", items: [0, 1] }, | ||
{ key: "[2;3]", items: [2, 3] }, | ||
]); | ||
}); | ||
}); |
143165
3543