tcp-log-client
Advanced tools
Comparing version 3.1.0 to 4.0.0
181
index.js
@@ -0,37 +1,164 @@ | ||
var EventEmitter = require('events').EventEmitter | ||
var inherits = require('util').inherits | ||
var ndjson = require('ndjson') | ||
var net = require('net') | ||
var pump = require('pump') | ||
var reconnect = require('reconnect-core') | ||
var through2 = require('through2') | ||
var uuid = require('uuid').v4 | ||
module.exports = { | ||
createReadStream: createReadStream, | ||
createWriteStream: createWriteStream | ||
} | ||
module.exports = TCPLogClient | ||
function createReadStream (socket, from) { | ||
var returned = pump( | ||
socket, | ||
ndjson.parse({strict: true}), | ||
through2.obj(function (message, _, done) { | ||
if (message.current === true) returned.emit('current') | ||
else if ('index' in message) { | ||
if ('error' in message) returned.emit('error', message) | ||
else if ('entry' in message) this.push(message) | ||
} | ||
done() | ||
function TCPLogClient (options) { | ||
if (!(this instanceof TCPLogClient)) return new TCPLogClient(options) | ||
var client = this | ||
// Apply default options. | ||
var serverOptions = options.server | ||
var reconnectOptions = options.reconnect || {} | ||
var highestIndexReceived = Number(options.from) || 0 | ||
var keepAlive = Boolean(options.keepalive) || true | ||
var noDelay = Boolean(options.noDelay) || true | ||
// Whether the client is currently connected. | ||
client.connected = false | ||
// When the client is connected, a stream of log entries. | ||
client.readStream = through2.obj(function (chunk, _, done) { | ||
// Advance the last-entry-seen counter. | ||
highestIndexReceived = chunk.index | ||
done(null, chunk) | ||
}) | ||
// The stream provided for consumption by reconnect-core. | ||
client._socketStream = null | ||
// Whether the client has ever successfully connected to the server. | ||
// Affects error event handling. | ||
var everConnected = true | ||
// A UUID-to-function map of callbacks for writes to the log. Used to | ||
// issue callbacks when the server responds with write confirmations. | ||
client._writeCallbacks = {} | ||
// Create a reconnect-core instance for TCP connection to server. | ||
var reconnecter = client._reconnecter = reconnect(function () { | ||
return net.connect(serverOptions) | ||
.setKeepAlive(keepAlive) | ||
.setNoDelay(noDelay) | ||
})(reconnectOptions, function (newSocketStream) { | ||
// Create a stream to filter out entries for reading. | ||
var filterStream = createReadStream(newSocketStream) | ||
// Replace the old duplex stream in the pipeline. | ||
if (client._filterStream) { | ||
client._filterStream.removeAllListeners() | ||
client._filterStream.unpipe() | ||
} | ||
client._filterStream = filterStream | ||
filterStream.pipe(client.readStream, {end: false}) | ||
client._socketStream = newSocketStream | ||
// Issue a read request from one past the last-seen index. | ||
proxyEvent(filterStream, 'current') | ||
var readMessage = {from: highestIndexReceived + 1} | ||
newSocketStream.write(JSON.stringify(readMessage) + '\n') | ||
client.emit('ready') | ||
}) | ||
.on('error', function (error) { | ||
if (!everConnected) client.emit('error', error) | ||
else { | ||
var code = error.code | ||
if (code === 'EPIPE') failPendingWrites('Server closed the connection.') | ||
else if (code === 'ECONNRESET') return | ||
else if (code === 'ECONNREFUSED') return | ||
else client.emit('error', error) | ||
} | ||
}) | ||
proxyEvent(reconnecter, 'disconnect', function () { | ||
if (client.readStream) client.readStream.unpipe() | ||
client.connected = false | ||
failPendingWrites('Disconnected from server.') | ||
}) | ||
proxyEvent(reconnecter, 'reconnect', function () { | ||
client.connected = true | ||
}) | ||
proxyEvent(reconnecter, 'connect', function (connection) { | ||
everConnected = true | ||
client.connected = true | ||
}) | ||
proxyEvent(reconnecter, 'backoff') | ||
proxyEvent(reconnecter, 'fail') | ||
function proxyEvent (emitter, event, optionalCallback) { | ||
emitter.on(event, function () { | ||
if (optionalCallback) optionalCallback.apply(client, arguments) | ||
client.emit(event) | ||
}) | ||
) | ||
returned.socket = socket | ||
returned.from = from | ||
socket.write(JSON.stringify({from: from || 1}) + '\n') | ||
return returned | ||
} | ||
function createReadStream (socket) { | ||
var returned = pump( | ||
socket, | ||
ndjson.parse({strict: true}), | ||
through2.obj(function (message, _, done) { | ||
if (message.current === true) returned.emit('current') | ||
else if ('index' in message) { | ||
if ('error' in message) returned.emit('error', message) | ||
// Pass through log entries. | ||
else if ('entry' in message) { | ||
this.push(message) | ||
// Callback for confirmed writes. | ||
} else if ('id' in message) { | ||
var id = message.id | ||
var callback = client._writeCallbacks[id] | ||
delete client._writeCallbacks[id] | ||
callback(null, message.index) | ||
} | ||
} | ||
done() | ||
}) | ||
) | ||
return returned | ||
} | ||
function failPendingWrites (message) { | ||
var callbacks = client._writeCallbacks | ||
Object.keys(client._writeCallbacks).forEach(function (id) { | ||
var callback = callbacks[id] | ||
delete callbacks[id] | ||
callback(new Error(message)) | ||
}) | ||
} | ||
} | ||
function createWriteStream (socket) { | ||
var returned = through2.obj(function (chunk, _, done) { | ||
done(null, {id: uuid(), entry: chunk}) | ||
}) | ||
returned.socket = socket | ||
pump(returned, ndjson.stringify(), socket) | ||
return returned | ||
inherits(TCPLogClient, EventEmitter) | ||
TCPLogClient.prototype.destroy = function () { | ||
this._reconnecter.reconnect = false | ||
this._reconnecter.disconnect() | ||
this.readStream.end() | ||
} | ||
TCPLogClient.prototype.write = function (entry, callback) { | ||
if (!this.connected) { | ||
throw new Error( | ||
'Cannot write when disconnected. ' + | ||
'Check `client.connected` before calling `client.write()`.' | ||
) | ||
} else { | ||
// Generate a UUID for the write. The server will echo the UUID back to | ||
// confirm the write. | ||
var id = uuid() | ||
this._writeCallbacks[id] = callback || noop | ||
var message = JSON.stringify({id: id, entry: entry}) + '\n' | ||
return this._socketStream.write(message) | ||
} | ||
} | ||
function noop () { } | ||
TCPLogClient.prototype.connect = function () { | ||
this._reconnecter.connect() | ||
return this | ||
} |
{ | ||
"name": "tcp-log-client", | ||
"description": "sync with a tcp-log-server", | ||
"version": "3.1.0", | ||
"description": "stream and write tcp-log-server entries", | ||
"version": "4.0.0", | ||
"author": "Kyle E. Mitchell <kyle@kemitchell.com> (https://kemitchell.com/)", | ||
@@ -9,2 +9,3 @@ "dependencies": { | ||
"pump": "^1.0.1", | ||
"reconnect-core": "^1.3.0", | ||
"through2": "^2.0.1", | ||
@@ -15,2 +16,3 @@ "uuid": "^2.0.2" | ||
"abstract-blob-store": "^3.2.0", | ||
"async.mapseries": "^0.5.2", | ||
"dev-null": "^0.1.1", | ||
@@ -23,5 +25,13 @@ "level-logs": "^1.1.0", | ||
"standard": "^7.1.2", | ||
"stream-set": "^1.1.0", | ||
"tape": "^4.6.0", | ||
"tcp-log-server": "^6.0.0" | ||
}, | ||
"keywords": [ | ||
"TCP", | ||
"client", | ||
"database", | ||
"kappa", | ||
"log" | ||
], | ||
"license": "MIT", | ||
@@ -28,0 +38,0 @@ "repository": "kemitchell/tcp-log-client.js", |
@@ -1,2 +0,2 @@ | ||
Sync with a [tcp-log-server]. | ||
Stream and write [tcp-log-server] entries. Reconnect automatically. | ||
@@ -6,9 +6,44 @@ [tcp-log-server]: https://npmjs.com/packages/tcp-log-server | ||
```javascript | ||
var client = new TCPLogClient({port: port}) | ||
var TCPLogClient = require('tcp-log-client') | ||
client.on('entry', function (entry, index) { }) | ||
var client = new TCPLogClient({ | ||
// Use these options for `require('net').connect(options)`. | ||
server: {port: port}, | ||
// Enable TCP keepalive. Enabled by default. | ||
keepAlive: true, | ||
// Disable the Nagle algorithm. Disabled by default. | ||
noDelay: true, | ||
// Start reading from entry index 1. 1 by default. | ||
from: 1, | ||
// Stop trying to reconnect and fail after 5 attempts. | ||
reconnect: {failAfter: 5} | ||
}) | ||
.on('error', function (error) { | ||
console.error(error) | ||
}) | ||
.on('fail', function () { | ||
console.error('Failed to reconnect.') | ||
}) | ||
.once('ready', function () { | ||
if (client.connected) { | ||
client.write({example: 'entry'}, function (error, index) { | ||
console.log('New entry index is %d', index) | ||
// Permanently disconnect and end `client.readStream`. | ||
client.destroy() | ||
}) | ||
} | ||
}) | ||
client.write({a: 1}, function (error, index) { }) | ||
// Readable stream of log entries. | ||
// Entries added with `client.write()` will be streamed, too. | ||
client.readStream.on('data', function (chunk) { | ||
console.log(chunk.index) | ||
console.log(chunk.entry) | ||
}) | ||
``` | ||
client.disconnect() | ||
``` | ||
See also: | ||
- [net documentation](https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener) for `options.server` | ||
- [reconnect-core](https://www.npmjs.com/package/reconnect-core) for `options.reconnect` | ||
- [migrate-versioned-log](https://www.npmjs.com/package/migrate-versioned-log) for versioning log entries |
265
test.js
var EventEmitter = require('events').EventEmitter | ||
var abs = require('abstract-blob-store') | ||
var client = require('./') | ||
var InMemoryBlobStore = require('abstract-blob-store') | ||
var TCPLogClient = require('./') | ||
var asyncMapSeries = require('async.mapseries') | ||
var devnull = require('dev-null') | ||
@@ -12,6 +13,31 @@ var levelLogs = require('level-logs') | ||
var sha256 = require('sha256') | ||
var streamSet = require('stream-set') | ||
var tape = require('tape') | ||
function startTestServer (callback) { | ||
memdown.clearGlobalStore() | ||
// Use an in-memdown LevelUP storage back-end for testing. | ||
var level = levelup('', {db: memdown}) | ||
var handler = logServerHandler( | ||
// Provide a pino logger, but pipe it to nowhere. | ||
pino({}, devnull()), | ||
levelLogs(level, {valueEncoding: 'json'}), | ||
// Use an in-memory blob store for testing. | ||
new InMemoryBlobStore(), | ||
new EventEmitter(), | ||
sha256 | ||
) | ||
// Track connections so tests can close to simulate problems. | ||
var connections = streamSet() | ||
net.createServer() | ||
.on('connection', function (socket) { connections.add(socket) }) | ||
.on('connection', handler) | ||
.once('close', function () { level.close() }) | ||
.listen(0, function () { | ||
callback(this, this.address().port, connections) | ||
}) | ||
} | ||
tape('start a test server', function (test) { | ||
withTestServer(function (server, port) { | ||
startTestServer(function (server, port) { | ||
test.pass('started a server') | ||
@@ -23,69 +49,208 @@ server.close() | ||
tape('read and write', function (test) { | ||
withTestServer(function (server, port) { | ||
var readStream = client.createReadStream(socket(port)) | ||
.on('data', function (data) { | ||
test.deepEqual(data.entry, {a: 1}, 'event with entry') | ||
test.equal(data.index, 1, 'event with index') | ||
writeStream.end() | ||
readStream.destroy() | ||
var entries = [{a: 1}, {b: 2}, {c: 3}] | ||
tape('read and write from same client', function (test) { | ||
startTestServer(function (server, port) { | ||
var received = [] | ||
var client = new TCPLogClient({server: {port: port}}) | ||
.connect() | ||
.once('ready', function () { | ||
client.readStream.on('data', function (data) { | ||
received.push(data.entry) | ||
if (received.length === entries.length) { | ||
test.deepEqual(received, entries, 'received entries') | ||
client.destroy() | ||
server.close() | ||
test.end() | ||
} | ||
}) | ||
entries.forEach(function (entry) { client.write(entry) }) | ||
}) | ||
}) | ||
}) | ||
tape('writes call back with indices', function (test) { | ||
startTestServer(function (server, port) { | ||
var client = new TCPLogClient({server: {port: port}}) | ||
.connect() | ||
.once('ready', function () { | ||
var expected = [] | ||
var received = [] | ||
asyncMapSeries(entries, function (entry, done) { | ||
client.write(entry, function (error, index) { | ||
test.ifError(error) | ||
expected.push({index: index, entry: entry}) | ||
}) | ||
}) | ||
client.readStream.on('data', function (data) { | ||
received.push(data) | ||
if (received.length === expected.length) { | ||
test.deepEqual(received, expected, 'received entries') | ||
client.destroy() | ||
server.close() | ||
test.end() | ||
} | ||
}) | ||
}) | ||
}) | ||
}) | ||
tape('read another client\'s writes', function (test) { | ||
startTestServer(function (server, port) { | ||
var options = {server: {port: port}} | ||
var reader = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
var writer = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
var received = [] | ||
reader.readStream.on('data', function (data) { | ||
received.push(data.entry) | ||
if (received.length === entries.length) { | ||
test.deepEqual(received, entries, 'received entries') | ||
reader.destroy() | ||
writer.destroy() | ||
server.close() | ||
test.end() | ||
} | ||
}) | ||
entries.forEach(function (entry) { writer.write(entry) }) | ||
}) | ||
}) | ||
}) | ||
}) | ||
tape('read another client\'s previous writes', function (test) { | ||
startTestServer(function (server, port) { | ||
var options = {server: {port: port}} | ||
var reader = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
var writer = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
entries.forEach(function (entry) { writer.write(entry) }) | ||
setImmediate(function () { | ||
var received = [] | ||
reader.readStream.on('data', function (data) { | ||
received.push(data.entry) | ||
if (received.length === entries.length) { | ||
test.deepEqual(received, entries, 'received entries') | ||
reader.destroy() | ||
writer.destroy() | ||
server.close() | ||
test.end() | ||
} | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
tape('current event', function (test) { | ||
startTestServer(function (server, port) { | ||
var options = {server: {port: port}} | ||
var writer = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
entries.forEach(function (entry) { writer.write(entry) }) | ||
var reader = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
reader.once('current', function (data) { | ||
test.pass('current event emitted') | ||
reader.destroy() | ||
writer.destroy() | ||
server.close() | ||
test.end() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
tape('reconnect', function (test) { | ||
startTestServer(function (server, port, connections) { | ||
var options = {server: {port: port}} | ||
var client = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { destroyAll(connections) }) | ||
.once('reconnect', function () { | ||
test.pass('reconnected') | ||
client.destroy() | ||
server.close() | ||
test.end() | ||
}) | ||
var writeStream = client.createWriteStream(socket(port)) | ||
writeStream.write({a: 1}) | ||
}) | ||
}) | ||
tape('read previous writes', function (test) { | ||
withTestServer(function (server, port) { | ||
var writeStream = client.createWriteStream(socket(port)) | ||
var entries = [{a: 1}, {b: 2}, {c: 3}] | ||
tape('read after reconnect', function (test) { | ||
startTestServer(function (server, port, connections) { | ||
var options = {server: {port: port}} | ||
var received = [] | ||
var receivedCurrent = false | ||
var receivedEntries = false | ||
var readStream = client.createReadStream(socket(port)) | ||
.once('current', function () { | ||
test.pass('current event') | ||
receivedCurrent = true | ||
done() | ||
var client = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
client.write(entries[0], function (error) { | ||
test.ifError(error, 'no error') | ||
client.once('ready', function () { | ||
client.write(entries[1]) | ||
client.write(entries[2]) | ||
}) | ||
destroyAll(connections) | ||
}) | ||
}) | ||
.on('data', function (data) { | ||
client.readStream.on('data', function (data) { | ||
received.push(data.entry) | ||
if (received.length === entries.length) { | ||
test.deepEqual(received, entries, 'received entries') | ||
receivedEntries = true | ||
done() | ||
} | ||
}) | ||
function done () { | ||
if (receivedEntries && receivedCurrent) { | ||
writeStream.end() | ||
readStream.destroy() | ||
client.destroy() | ||
server.close() | ||
test.end() | ||
} | ||
}) | ||
}) | ||
}) | ||
tape('fail', function (test) { | ||
startTestServer(function (server, port, connections) { | ||
var options = { | ||
server: {port: port}, | ||
reconnect: {initialDelay: 1, maxDelay: 2, failAfter: 1} | ||
} | ||
entries.forEach(function (entry) { | ||
writeStream.write(entry) | ||
var client = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { | ||
server.close() | ||
destroyAll(connections) | ||
}) | ||
.once('fail', function () { | ||
test.pass('fail') | ||
client.destroy() | ||
server.close() | ||
test.end() | ||
}) | ||
}) | ||
}) | ||
function socket (port) { | ||
return net.connect(port).setKeepAlive(true) | ||
} | ||
tape('destroy ends read stream', function (test) { | ||
startTestServer(function (server, port, connections) { | ||
var options = {server: {port: port}} | ||
var client = new TCPLogClient(options) | ||
.connect() | ||
.once('ready', function () { client.destroy() }) | ||
client.readStream.once('finish', function () { | ||
test.pass('stream ended') | ||
server.close() | ||
test.end() | ||
}) | ||
}) | ||
}) | ||
function withTestServer (callback) { | ||
memdown.clearGlobalStore() | ||
var level = levelup('', {db: memdown}) | ||
var logs = levelLogs(level, {valueEncoding: 'json'}) | ||
var blobs = abs() | ||
var log = pino({}, devnull()) | ||
var emitter = new EventEmitter() | ||
var handler = logServerHandler(log, logs, blobs, emitter, sha256) | ||
var server = net.createServer() | ||
.on('connection', handler) | ||
.once('close', function () { level.close() }) | ||
.listen(0, function () { callback(server, this.address().port) }) | ||
function destroyAll (connections) { | ||
connections.forEach(function (connection) { | ||
connection.destroy() | ||
}) | ||
} |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
16235
386
49
5
12
3
+ Addedreconnect-core@^1.3.0
+ Addedbackoff@2.5.0(transitive)
+ Addedprecond@0.2.3(transitive)
+ Addedreconnect-core@1.3.0(transitive)