Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

atomic-queue

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

atomic-queue - npm Package Compare versions

Comparing version 2.0.0 to 3.0.0

26

index.js

@@ -22,7 +22,6 @@ var events = require('events')

this.worker = worker
this.concurrency = opts.concurrency || 1
this.db = opts.db || memdb()
this.opts = opts
this.pool = createPool(this.worker, opts)
this.pool = createPool(worker, opts)
this.changes = createChangeDB({

@@ -36,2 +35,5 @@ db: this.db,

this.stream = this.createDuplexStream()
this.stream._queue = this
this.pool.on('start', function start (data, worker, change) {

@@ -49,3 +51,3 @@ var changeNum = change.change

this.on('update-start', function updateStart (data) {
this.stream.on('update-start', function updateStart (data) {
debug('update-start', data)

@@ -55,3 +57,3 @@ self.updatingInflight = true

this.on('update-end', function updateEnd (data) {
this.stream.on('update-end', function updateEnd (data) {
debug('update-end', data)

@@ -61,5 +63,2 @@ self.updatingInflight = false

this.stream = this.createDuplexStream()
this.stream._queue = this
events.EventEmitter.call(this)

@@ -88,4 +87,4 @@

this.initialize(function ready (err) {
if (err) return self.emit('error', err)
self.emit('ready')
if (err) return self.stream.destroy(err)
self.stream.emit('ready', self.inflight)
var readStream = self.createWorkStream({since: self.inflight.since, live: true})

@@ -114,3 +113,2 @@ duplexStream.setReadable(readStream)

cb()
// also kick off the worker

@@ -123,4 +121,2 @@ proc.work(data.value.value, doneWorking, data)

self.emit('finish', data)
// TODO implement purging. should remove processed entries from the changes feed

@@ -133,6 +129,6 @@

function update () {
if (self.updatingInflight) return self.once('update-end', update)
self.emit('update-start', inflight)
if (self.updatingInflight) return self.stream.once('update-end', update)
self.stream.emit('update-start', inflight)
self.db.put('inflight', inflight, function updated (err) {
self.emit('update-end', inflight)
self.stream.emit('update-end', inflight)
if (err) splitStream.destroy(err)

@@ -139,0 +135,0 @@ if (output) splitStream.push(output)

{
"name": "atomic-queue",
"version": "2.0.0",
"version": "3.0.0",
"description": "a crash friendly queue that persists queue state and can restart. uses a worker pool and has configurable concurrency",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -7,6 +7,6 @@ var events = require('events')

function Pool (workFn, opts) {
if (!(this instanceof Pool)) return new Pool(workFn, opts)
function Pool (workerTemplate, opts) {
if (!(this instanceof Pool)) return new Pool(workerTemplate, opts)
if (!opts) opts = {}
this.workFn = workFn
this.workerTemplate = workerTemplate
this.working = 0

@@ -23,4 +23,8 @@ this.limit = opts.concurrency || 1

var workers = []
var useExistingWorkers = false
if (Array.isArray(this.workerTemplate)) useExistingWorkers = true
for (var i = 0; i < this.limit; i++) {
var worker = createWorker(this.workFn)
var workFn = useExistingWorkers ? this.workerTemplate[i] : this.workerTemplate
var worker = createWorker(workFn)

@@ -27,0 +31,0 @@ // consolidate events

@@ -35,24 +35,28 @@ # atomic-queue

### queue.on
### events
you can listen to the following events:
in addition to standard stream events you can also listen to the following:
#### error
#### queue.on('ready')
emitted after startup when the queue state has been read from disk and the queue is now ready to start working
#### queue.on('error')
when a catastrophic error has occurred
#### start
#### queue.on('update-start')
when a job starts working
when the queue starts flushing its state to disk
#### end
#### queue.on('update-end')
when a job finishes working
when the queue finishes flushing its state to disk
#### update-start
#### queue.pool.on('start')
when the queue starts flushing its state to disk
when a job starts working
#### update-end
#### queue.pool.on('finish')
when the queue finishes flushing its state to disk
when a job finishes working
var events = require('events')
var inherits = require('inherits')
var debug = require('debug')('atomic-queue-worker')

@@ -18,5 +19,7 @@ module.exports = Worker

this.emit('start', data, change)
debug('start', change.change)
this.working = true
this.workFn(data, function done (err, output) {
self.working = false
debug('finish', change.change)
if (err) self.emit('error', err)

@@ -23,0 +26,0 @@ self.emit('finish', output, data, change)

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc