Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

thread-stream

Package Overview
Dependencies
Maintainers
1
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

thread-stream - npm Package Compare versions

Comparing version 0.3.1 to 0.3.2

4

index.js

@@ -6,3 +6,3 @@ 'use strict'

const { join } = require('path')
const wait = require('./lib/wait')
const { wait } = require('./lib/wait')
const {

@@ -164,3 +164,5 @@ WRITE_INDEX,

flush (cb) {
// TODO write all .buf
const writeIndex = Atomics.load(this._state, WRITE_INDEX)
// process._rawDebug(`(flush) readIndex (${Atomics.load(this._state, READ_INDEX)}) writeIndex (${Atomics.load(this._state, WRITE_INDEX)})`)
wait(this._state, READ_INDEX, writeIndex, Infinity, (err, res) => {

@@ -167,0 +169,0 @@ if (err) {

'use strict'
const MAX_TIMEOUT = 1000
function wait (state, index, expected, timeout, done) {

@@ -11,12 +13,11 @@ const max = Date.now() + timeout

let prior = current
const check = () => {
const check = (backoff) => {
if (Date.now() > max) {
done(null, 'timed-out')
} else {
// Maybe use a backoff algorithm
setImmediate(() => {
setTimeout(() => {
prior = current
current = Atomics.load(state, index)
if (current === prior) {
check()
check(backoff >= MAX_TIMEOUT ? MAX_TIMEOUT : backoff * 2)
} else {

@@ -26,8 +27,37 @@ if (current === expected) done(null, 'ok')

}
})
}, backoff)
}
}
check()
check(1)
}
module.exports = wait
// let waitDiffCount = 0
function waitDiff (state, index, expected, timeout, done) {
// const id = waitDiffCount++
// process._rawDebug(`>>> waitDiff ${id}`)
const max = Date.now() + timeout
let current = Atomics.load(state, index)
if (current !== expected) {
done(null, 'ok')
return
}
const check = (backoff) => {
// process._rawDebug(`${id} ${index} current ${current} expected ${expected}`)
// process._rawDebug('' + backoff)
if (Date.now() > max) {
done(null, 'timed-out')
} else {
setTimeout(() => {
current = Atomics.load(state, index)
if (current !== expected) {
done(null, 'ok')
} else {
check(backoff >= MAX_TIMEOUT ? MAX_TIMEOUT : backoff * 2)
}
}, backoff)
}
}
check(1)
}
module.exports = { wait, waitDiff }

@@ -5,2 +5,3 @@ 'use strict'

const { WRITE_INDEX, READ_INDEX } = require('./indexes')
const { waitDiff } = require('./wait')
const fn = require(workerData.filename)

@@ -43,4 +44,4 @@

function run () {
let current = Atomics.load(state, READ_INDEX)
let end = Atomics.load(state, WRITE_INDEX)
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)

@@ -50,10 +51,8 @@ // process._rawDebug(`pre state ${current} ${end}`)

if (end === current) {
Atomics.wait(state, WRITE_INDEX, end, 100)
end = Atomics.load(state, WRITE_INDEX)
current = Atomics.load(state, READ_INDEX)
if (end === current) {
setImmediate(run)
return
if (end === data.length) {
waitDiff(state, READ_INDEX, end, Infinity, run)
} else {
waitDiff(state, WRITE_INDEX, end, Infinity, run)
}
return
}

@@ -74,7 +73,15 @@

destination.write(toWrite, function () {
const res = destination.write(toWrite)
if (res) {
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
run()
})
setImmediate(run)
} else {
destination.once('drain', function () {
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
run()
})
}
}
{
"name": "thread-stream",
"version": "0.3.1",
"version": "0.3.2",
"description": "A streaming way to send data to a Node.js Worker Thread",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc