Comparing version 1.0.1 to 1.0.2
@@ -19,11 +19,3 @@ 'use strict' | ||
exports.issue = issue | ||
exports.within = within | ||
// Returns milliseconds to wait while ts is within l. | ||
function within (l, ts) { | ||
const diff = new Date().getTime() - ts | ||
return Math.max(0, l - diff) | ||
} | ||
function createHeaders (client, moreHeaders) { | ||
@@ -81,149 +73,114 @@ let headers = { | ||
return mod.request(opts) | ||
} | ||
const req = mod.request(opts) | ||
// Issues request with action for uri. When the request-response cycle | ||
// is complete the callback executes. | ||
// | ||
// cb (er, statusCode, url, body) | ||
// | ||
// The er and the body parameters are optional. | ||
function issue (client, action, uri, cb = () => {}) { | ||
const log = client.log | ||
req.setTimeout(client.timeout) | ||
// Accepting URL and String types. | ||
uri = uri.href || uri | ||
return req | ||
} | ||
debug(uri) | ||
// Returns a response handler closing over cb. | ||
function createResponseHandler (cb = () => {}) { | ||
return (res) => { | ||
const { statusCode } = res | ||
let callback = (er, statusCode, uri, body) => { | ||
// Sans cb is NOP. | ||
cb(er, statusCode, uri, body) | ||
const onError = (er) => { | ||
invalidate(422, er) | ||
} | ||
debug('request: event names: %s', req.eventNames()) | ||
const onAborted = () => { | ||
invalidate(400) | ||
} | ||
callback = () => {} | ||
} | ||
const onClose = () => { | ||
invalidate(400) | ||
} | ||
log.info('issueing: ( %s, %s )', action, uri) | ||
const onEnd = () => { | ||
invalidate() | ||
} | ||
const req = createRequest(client, action, uri) | ||
let chunks = [] | ||
// Managing the request | ||
const onReadable = () => { | ||
let chunk | ||
while ((chunk = res.read())) { | ||
chunks.push(chunk) | ||
} | ||
} | ||
let requested = () => { | ||
debug('requested') | ||
let invalidate = (sc = statusCode, error) => { | ||
res.removeListener('error', onError) | ||
res.removeListener('aborted', onAborted) | ||
res.removeListener('close', onClose) | ||
res.removeListener('end', onEnd) | ||
res.removeListener('readable', onReadable) | ||
req.removeListener('error', requestFailed) | ||
req.removeListener('finish', requested) | ||
invalidate = () => { | ||
debug(new Error('multiple invalidations')) | ||
} | ||
requested = () => {} | ||
} | ||
const body = Buffer.concat(chunks).toString() | ||
let failed = (statusCode = 400, er) => { | ||
requested() | ||
req.removeListener('response', responseReceived) | ||
req.removeListener('timeout', socketTimeoutExceeded) | ||
log.error('request failed: ( %s, %s )', statusCode, er) | ||
failed = () => {} | ||
callback(er, statusCode, uri, undefined) | ||
} | ||
const requestFailed = (er) => { | ||
failed(400, er) | ||
} | ||
req.once('error', requestFailed) | ||
// Monitoring the socket | ||
const socketTimeoutExceeded = () => { | ||
failed(408) | ||
req.abort() | ||
} | ||
req.once('timeout', socketTimeoutExceeded) | ||
req.setTimeout(client.timeout) | ||
// Receiving the reponse | ||
const responseReceived = (res) => { | ||
// The accumulated payload body. | ||
let body = '' | ||
// Exit here. | ||
let finish = (statusCode = res.statusCode, error) => { | ||
req.removeListener('timeout', socketTimeoutExceeded) | ||
res.removeListener('error', bodyFailed) | ||
res.removeListener('aborted', requestAborted) | ||
res.removeListener('close', connectionClosed) | ||
res.removeListener('end', responseEnded) | ||
res.removeListener('readable', dataAvailable) | ||
finish = () => {} | ||
debug('response: event names: %s', res.eventNames()) | ||
callback(error, statusCode, uri, body) | ||
cb(error, sc, body) | ||
} | ||
const bodyFailed = (er) => { | ||
debug('bodyFailed') | ||
finish(422, er) | ||
const install = () => { | ||
res.once('error', onError) | ||
res.once('aborted', onAborted) | ||
res.once('close', onClose) | ||
res.once('end', onEnd) | ||
res.on('readable', onReadable) | ||
} | ||
const requestAborted = () => { | ||
debug('requestAborted') | ||
finish(400) | ||
} | ||
res.aborted ? invalidate(400) : install() | ||
} | ||
} | ||
const connectionClosed = () => { | ||
debug('connectionClosed') | ||
finish(400) | ||
} | ||
// Issues request with action for uri. When the request-response cycle | ||
// is complete the callback executes. | ||
// | ||
// cb (er, statusCode, body) | ||
// | ||
// The er and the body parameters are optional. | ||
function issue (client, action, uri, cb = () => {}) { | ||
const { log } = client | ||
uri = uri.href || uri | ||
const responseEnded = () => { | ||
debug('responseEnded') | ||
finish() | ||
} | ||
log.info('issueing: ( %s, %s )', action, uri) | ||
res.once('error', bodyFailed) | ||
res.once('aborted', requestAborted) | ||
res.once('close', connectionClosed) | ||
res.once('end', responseEnded) | ||
const req = createRequest(client, action, uri) | ||
if (res.statusCode === 200) { | ||
log.info('success: %s', uri) | ||
} else { | ||
log.warn('failure: %s', res.statusCode) | ||
} | ||
let invalidate = (er, statusCode, body) => { | ||
req.removeListener('response', onResponse) | ||
req.removeListener('timeout', onTimeout) | ||
req.removeListener('error', onError) | ||
if (res.aborted) { | ||
return finish(400) | ||
invalidate = () => { | ||
debug(new Error('multiple invalidations')) | ||
} | ||
// Accumulating the payload body. | ||
debug('request: event names: %s', req.eventNames()) | ||
cb(er, statusCode, body) | ||
} | ||
const dataAvailable = () => { | ||
debug('dataAvailable') | ||
let chunk | ||
const onError = (er) => { | ||
invalidate(er, 400) | ||
} | ||
while ((chunk = res.read())) { | ||
body += chunk | ||
} | ||
} | ||
const onTimeout = () => { | ||
req.abort() | ||
invalidate(new Error('socket timeout'), 400) | ||
} | ||
res.on('readable', dataAvailable) | ||
const onResponse = createResponseHandler(invalidate) | ||
const install = () => { | ||
req.once('timeout', onTimeout) | ||
req.once('error', onError) | ||
req.once('response', onResponse) | ||
} | ||
req.once('response', responseReceived) | ||
req.once('finish', requested) | ||
install() | ||
req.end() | ||
} |
@@ -6,2 +6,10 @@ 'use strict' | ||
function parse (body) { | ||
try { | ||
return JSON.parse(body) | ||
} catch (error) { | ||
return { error: error, notJSON: body } | ||
} | ||
} | ||
// Returns a Transform stream for issueing Edged client actions. Write action | ||
@@ -14,7 +22,5 @@ // objects to this stream and read results objects from it. | ||
return new Transform({ | ||
transform (item, enc, cb) { | ||
const uri = item.uri.href || item.uri | ||
issue(client, item.action, uri, (er, sc, uri, body) => { | ||
this.push({ statusCode: sc, uri: uri, body: body }) | ||
transform ({ action, uri }, enc, cb) { | ||
issue(client, action, uri, (er, sc, body) => { | ||
this.push({ statusCode: sc, uri: uri, body: parse(body) }) | ||
cb(er) | ||
@@ -35,6 +41,4 @@ }) | ||
transform (uri, enc, cb) { | ||
uri = uri.href || uri | ||
issue(client, action, uri, (er, sc, uri, body) => { | ||
this.push({ statusCode: sc, uri: uri, body: body }) | ||
issue(client, action, uri, (er, sc, body) => { | ||
this.push({ statusCode: sc, uri: uri, body: parse(body) }) | ||
cb(er) | ||
@@ -41,0 +45,0 @@ }) |
{ | ||
"name": "edged", | ||
"version": "1.0.1", | ||
"version": "1.0.2", | ||
"description": "Purge items at the edge of the network", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
#!/usr/bin/env node | ||
// repl - access Edged with a REPL | ||
// repl - walk on the edge | ||
@@ -37,2 +37,3 @@ const repl = require('repl') | ||
console.log(inspect(obj, { colors: true })) | ||
server.displayPrompt() | ||
cb() | ||
@@ -39,0 +40,0 @@ }, |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
11206
284