thread-stream
Advanced tools
Comparing version 0.3.1 to 0.3.2
@@ -6,3 +6,3 @@ 'use strict' | ||
const { join } = require('path') | ||
const wait = require('./lib/wait') | ||
const { wait } = require('./lib/wait') | ||
const { | ||
@@ -164,3 +164,5 @@ WRITE_INDEX, | ||
flush (cb) { | ||
// TODO write all .buf | ||
const writeIndex = Atomics.load(this._state, WRITE_INDEX) | ||
// process._rawDebug(`(flush) readIndex (${Atomics.load(this._state, READ_INDEX)}) writeIndex (${Atomics.load(this._state, WRITE_INDEX)})`) | ||
wait(this._state, READ_INDEX, writeIndex, Infinity, (err, res) => { | ||
@@ -167,0 +169,0 @@ if (err) { |
'use strict' | ||
const MAX_TIMEOUT = 1000 | ||
function wait (state, index, expected, timeout, done) { | ||
@@ -11,12 +13,11 @@ const max = Date.now() + timeout | ||
let prior = current | ||
const check = () => { | ||
const check = (backoff) => { | ||
if (Date.now() > max) { | ||
done(null, 'timed-out') | ||
} else { | ||
// Maybe use a backoff algorithm | ||
setImmediate(() => { | ||
setTimeout(() => { | ||
prior = current | ||
current = Atomics.load(state, index) | ||
if (current === prior) { | ||
check() | ||
check(backoff >= MAX_TIMEOUT ? MAX_TIMEOUT : backoff * 2) | ||
} else { | ||
@@ -26,8 +27,37 @@ if (current === expected) done(null, 'ok') | ||
} | ||
}) | ||
}, backoff) | ||
} | ||
} | ||
check() | ||
check(1) | ||
} | ||
module.exports = wait | ||
// let waitDiffCount = 0 | ||
function waitDiff (state, index, expected, timeout, done) { | ||
// const id = waitDiffCount++ | ||
// process._rawDebug(`>>> waitDiff ${id}`) | ||
const max = Date.now() + timeout | ||
let current = Atomics.load(state, index) | ||
if (current !== expected) { | ||
done(null, 'ok') | ||
return | ||
} | ||
const check = (backoff) => { | ||
// process._rawDebug(`${id} ${index} current ${current} expected ${expected}`) | ||
// process._rawDebug('' + backoff) | ||
if (Date.now() > max) { | ||
done(null, 'timed-out') | ||
} else { | ||
setTimeout(() => { | ||
current = Atomics.load(state, index) | ||
if (current !== expected) { | ||
done(null, 'ok') | ||
} else { | ||
check(backoff >= MAX_TIMEOUT ? MAX_TIMEOUT : backoff * 2) | ||
} | ||
}, backoff) | ||
} | ||
} | ||
check(1) | ||
} | ||
module.exports = { wait, waitDiff } |
@@ -5,2 +5,3 @@ 'use strict' | ||
const { WRITE_INDEX, READ_INDEX } = require('./indexes') | ||
const { waitDiff } = require('./wait') | ||
const fn = require(workerData.filename) | ||
@@ -43,4 +44,4 @@ | ||
function run () { | ||
let current = Atomics.load(state, READ_INDEX) | ||
let end = Atomics.load(state, WRITE_INDEX) | ||
const current = Atomics.load(state, READ_INDEX) | ||
const end = Atomics.load(state, WRITE_INDEX) | ||
@@ -50,10 +51,8 @@ // process._rawDebug(`pre state ${current} ${end}`) | ||
if (end === current) { | ||
Atomics.wait(state, WRITE_INDEX, end, 100) | ||
end = Atomics.load(state, WRITE_INDEX) | ||
current = Atomics.load(state, READ_INDEX) | ||
if (end === current) { | ||
setImmediate(run) | ||
return | ||
if (end === data.length) { | ||
waitDiff(state, READ_INDEX, end, Infinity, run) | ||
} else { | ||
waitDiff(state, WRITE_INDEX, end, Infinity, run) | ||
} | ||
return | ||
} | ||
@@ -74,7 +73,15 @@ | ||
destination.write(toWrite, function () { | ||
const res = destination.write(toWrite) | ||
if (res) { | ||
Atomics.store(state, READ_INDEX, end) | ||
Atomics.notify(state, READ_INDEX) | ||
run() | ||
}) | ||
setImmediate(run) | ||
} else { | ||
destination.once('drain', function () { | ||
Atomics.store(state, READ_INDEX, end) | ||
Atomics.notify(state, READ_INDEX) | ||
run() | ||
}) | ||
} | ||
} |
{ | ||
"name": "thread-stream", | ||
"version": "0.3.1", | ||
"version": "0.3.2", | ||
"description": "A streaming way to send data to a Node.js Worker Thread", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
21239
692