New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

work-queue

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

work-queue - npm Package Compare versions

Comparing version
0.0.1
to
0.0.2
bin/queue-reader

Sorry, the diff of this file is not supported yet

+30
-13

@@ -29,7 +29,23 @@ #

$.extend module.exports, {
push: (item) -> ready.wait( (err, queue) -> queue.push item ); @
push: (item) ->
unless 'type' of item
throw new Error("a 'type' is required on work items")
ready.wait( (err, queue) -> queue.push item ); @
pop: (cb) -> ready.wait( (err, queue) -> queue.pop cb ); @
clear: -> ready.wait( (err, queue) -> queue.clear() ); @
close: -> $.log("close wait"); return ready.wait (err, queue) -> queue.close()
close: -> ready.wait (err, queue) -> queue.close()
count: (cb) -> ready.wait( (err, queue) -> queue.count cb ); @
register: (type, handler) ->
ready.wait (err, queue) -> queue.register type, handler
createWorker: (opts) ->
w = $.Promise()
ready.wait (err, queue) ->
if err then w.reject err
else w.resolve queue.createWorker()
return {
pause: ->
w.then (worker) -> worker.pause()
resume: ->
w.then (worker) -> worker.resume()
}

@@ -49,3 +65,3 @@ connect: (url, opts) ->

q = db.collection(opts.collection)
q.ensureIndex { status: 1, readyAt: 1 }, unsafe
q.ensureIndex { status: 1, readyAt: 1 }, unsafe, (err) ->

@@ -97,4 +113,3 @@ getNewReaderId = -> $.random.string opts.id[0].length + opts.id[1], opts.id[0]

mtime: $.now
},
$inc: { retryCount: 1 }
}, "$inc": { retryCount: 1 }
}, { multi: false, safe: true }, cb

@@ -120,3 +135,3 @@

$.log "closing"
db.close()
db.close -> $.log "closed"
$.log "resetting", ready.promiseId

@@ -130,3 +145,3 @@ ready.reset()

$.log "clearing"
q.remove {}, -> $.log "clear result:", arguments
q.remove {}, (err) -> $.log "clear result:", err

@@ -136,2 +151,4 @@ count: (cb) -> q.count { status: "new" }, cb

log "inserting", item
unless 'type' of item
throw new Error("a 'type' is required on work items")
# TODO: if item has an _id that is already in the db

@@ -186,5 +203,5 @@ # and the doc in the db is status: "failed", allow overwrite

worker_opts = $.extend {
idle_delay: 100
busy_delay: 500
busy_max: 10
idle_delay: 100 # polling interval when the queue is empty
busy_delay: 500 # delay interval if we try to fetch new items while still working on an old one
busy_max: 10 # give up on a work item if we work on it for longer than (busy_max * busy_delay) ms
}, worker_opts

@@ -197,3 +214,3 @@ nextItem = (err) =>

when err then switch err
# Busy: we still have outstanding items
# Busy: we still have outstanding items
when "busy"

@@ -203,7 +220,7 @@ if ++busy_count > worker_opts.busy_max

readerId = getNewReaderId()
log "starting over with new worker id", readerId
log = $.logger "worker #{readerId}"
log "starting over with new reader id"
$.delay worker_opts.busy_delay, nextItem
# Unknown Error: just log it and move on
else $.log "pop error:", err
else log "pop error:", err
# Idle: wait for idle_delay and poll the next item

@@ -210,0 +227,0 @@ when (not item?) and (not done?) then $.delay worker_opts.idle_delay, nextItem

MOCHA=node_modules/.bin/mocha
MOCHA_FMT=spec
MOCHA_OPTS=--compilers coffee:coffee-script --globals Bling,$$ -R ${MOCHA_FMT} -s 100
MOCHA_OPTS=--compilers coffee:coffee-script/register --globals Bling,$$ -R ${MOCHA_FMT} -s 100 --bail

@@ -6,0 +6,0 @@ all: test

{
"name": "work-queue",
"version": "0.0.1",
"version": "0.0.2",
"description": "Process scheduled items from a queue held in MongoDB.",

@@ -15,4 +15,5 @@ "main": "index.coffee",

"bling": "latest",
"mongodb": "latest"
"mongodb": "latest",
"optimist": "latest"
}
}
+53
-24

@@ -13,11 +13,11 @@ The Work Queue

```coffee
```
WorkQueue = require 'work-queue'
WorkQueue = require('work-queue')
WorkQueue.connect "mongodb://localhost:27017/test", {
WorkQueue.connect("mongodb://localhost:27017/test", {
collection: "workQueue",
readerId: [ "reader-", 5 ], # 5 chars of randomness
# consider, readerId: "app-server-56"
}
readerId: [ "reader-", 5 ], // 5 chars of randomness
// consider, readerId: "app-server-56"
})

@@ -31,6 +31,6 @@ ```

WorkQueue.push {
type: "my-type"
WorkQueue.push({
type: "my-type",
schedule: { at: timestamp }
}
})

@@ -42,9 +42,9 @@ ```

* `every` with an interval in ms, example:
```coffee
WorkQueue.push { type: "foo", schedule: { every: 30*1000} }
```
WorkQueue.push({ type: "foo", schedule: { every: 30*1000} })
```
* `after` with a delay in ms, example:
```coffee
WorkQueue.push { type: "foo", schedule: { after: 5*60*1000 } }
```
WorkQueue.push({ type: "foo", schedule: { after: 5*60*1000 } })
```

@@ -72,16 +72,18 @@ You can combine `every` and `after`, to control when the first iteration occurs.

```coffee
WorkQueue.register 'my-type', (item, done) ->
# item has all the fields shown above
doWorkOnItem item, (err) ->
if err then done(err) # fail
else done() # all ok!
```
WorkQueue.register('my-type', function(item, done) {
// item has all the fields shown above
doWorkOnItem(item, function (err) {
if(err) { done(err) }
else { done() }
})
})
worker = WorkQueue.createWorker {
idle_delay: 100 # polling interval if nothing to do
}
worker = WorkQueue.createWorker({
idle_delay: 100 // polling interval if nothing to do
})
worker.resume()
# run this example for 10 seconds, then pause
setTimeout worker.pause, 10000
// run this example for 10 seconds, then pause
setTimeout(worker.pause, 10000)

@@ -91,1 +93,28 @@ ```

A usable example can be found in `bin/queue-reader.coffee`.
bin/queue-reader.coffee
-----------------------
```
Usage: queue-reader [options...] mongodb://host:port/db_name
Options:
-c, --collection the collection to hold work orders in [default: "workQueue"]
-i, --interval when idle, how often to look for new work [default: 100]
-r, --require require this/these module(s), which should export type handlers [default: ""]
-d, --demo DANGEROUS: load an example queue as a test, will flush all jobs in the specificed collection [default: false]
```
The `-r` or `--require` option is the most important if you want to do real work. It can be given multiple times, and each string given to it will be passed to `require()` within the reader script.
Each module required in this way should export an object full of `{ type: handler }` pairs.
Example:
```
module.exports['echo'] = function (item, done) {
console.log(item)
done()
}
```
The `-i` or `--interval` option is only meaningful when the queue is empty. When each work item is completed, a check for new work is performed immediately.

@@ -16,9 +16,10 @@ $ = require 'bling'

'''
describe '.push()', ->
it "adds work items (chainable)", (pass) ->
Q.clear().push( type: "my-type", schedule: { at: $.now } ).count (err, count) ->
assert.equal err, null
assert.equal count, 1
pass()
Q.clear().push( type: "my-type", schedule: { at: $.now } )
$.delay 30, ->
Q.count (err, count) ->
assert.equal err, null
assert.equal count, 1
pass()
it "rejects items without a type", ->

@@ -29,21 +30,44 @@ assert.throws -> Q.push( notype: "haha" )

it "yields the next item, and a finalizer", (pass) ->
Q.clear().push( type: "pop-test", schedule: { at: $.now } ).pop (item, done) ->
assert.equal item.type, "pop-test"
done()
Q.count (err, count) ->
Q.clear().push( type: "pop-test", schedule: { at: $.now } )
$.delay 30, -> # make sure that the new item is really 'due'
Q.pop (err, item, done) ->
assert.equal err, null
assert.equal count, 0
pass()
assert.notEqual item, null
assert $.is 'function', done
assert.equal item.type, "pop-test"
done()
$.delay 30, ->
Q.count (err, count) ->
assert.equal err, null
assert.equal count, 0
pass()
describe ".register()", ->
it "can register handlers by type", ->
Q.register "my-type", (err, item, done) ->
describe ".createWorker()", ->
it "creates a Worker"
it "creates a Worker", ->
worker = Q.createWorker()
assert $.is 'function', worker.pause
assert $.is 'function', worker.resume
describe "a Worker", ->
it "can pause"
it "can resume"
it "can register handlers by type"
it "processes jobs when running"
it "processes jobs when running", (pass) ->
Q.register "test-type", (item, done) ->
assert.equal $.type(done), 'function'
done()
assert.notEqual item, null
assert.equal item.type, "test-type"
pass()
'''
it "can be closed", (pass) ->
assert $.is 'function', Q.close
assert $.is 'promise', Q.close().wait pass
Q.clear().push( { type: "test-type", schedule: { at: $.now } } )
worker = Q.createWorker()
worker.resume()
$.delay 500, worker.pause
describe ".close()", ->
it "is a function", ->
assert $.is 'function', Q.close
it "returns a promise", (done) ->
Q.close().wait (err) -> done()
it "should pause all workers"
Optimist = require('optimist')
$.log "Options:", opts = Optimist \
.options('c', {
alias: 'collection',
default: 'workQueue',
describe: 'the collection to hold work orders in'
}) \
.options('i', {
alias: 'interval',
default: 100
describe: 'when idle, how often to look for new work'
}) \
.options('r', {
alias: 'require',
default: '',
describe: 'require this/these module(s), which should export type handlers'
}) \
.option('d', {
alias: 'demo',
default: false
describe: 'DANGEROUS: load an example queue as a test, will flush all jobs in the specificed collection'
}) \
.demand(1)
.check( (argv) ->
unless /^mongodb:/.test argv._
throw new Error("url must begin with mongodb://")
)
.usage("Usage: $0 [options...] mongodb://host:port/db_name")
.argv
url = opts._
mins = 60*1000
W = require "../index.coffee"
if $.is 'array', opts.require
for r in opts.require
$.log "Requiring '#{r}'..."
for type, handler of require(r)
$.assert ($.is 'string', type), "type must be string: #{type}"
$.assert ($.is 'function', handler), "handler must be function: #{handler}"
W.register type, handler
W.connect url, { collection: opts.collection }
worker = W.createWorker {
idle_delay: opts.interval
}
worker.resume()
if opts.demo
W.register 'echo', (item, done) ->
$.log "ECHO:", item.message
done()
W.clear().push(
type: "echo"
schedule: { every: .1*mins, maxFail: Infinity }
message: "Should recur every six seconds"
_id: "only_one"
).push(
type: "echo"
schedule: { after: .5*mins }
message: "Once after thirty seconds"
).push(
type: "echo"
schedule: { at: $.now + 3000 }
message: "Once after three seconds"
)
$.delay 32000, ->
$.log "Ending demo."
worker.pause()