Socket
Socket
Sign inDemoInstall

tcp-log-client

Package Overview
Dependencies
23
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.1.0 to 4.0.0

181

index.js

@@ -0,37 +1,164 @@

var EventEmitter = require('events').EventEmitter
var inherits = require('util').inherits
var ndjson = require('ndjson')
var net = require('net')
var pump = require('pump')
var reconnect = require('reconnect-core')
var through2 = require('through2')
var uuid = require('uuid').v4
module.exports = {
createReadStream: createReadStream,
createWriteStream: createWriteStream
}
module.exports = TCPLogClient
function createReadStream (socket, from) {
var returned = pump(
socket,
ndjson.parse({strict: true}),
through2.obj(function (message, _, done) {
if (message.current === true) returned.emit('current')
else if ('index' in message) {
if ('error' in message) returned.emit('error', message)
else if ('entry' in message) this.push(message)
}
done()
function TCPLogClient (options) {
if (!(this instanceof TCPLogClient)) return new TCPLogClient(options)
var client = this
// Apply default options.
var serverOptions = options.server
var reconnectOptions = options.reconnect || {}
var highestIndexReceived = Number(options.from) || 0
var keepAlive = Boolean(options.keepalive) || true
var noDelay = Boolean(options.noDelay) || true
// Whether the client is currently connected.
client.connected = false
// When the client is connected, a stream of log entries.
client.readStream = through2.obj(function (chunk, _, done) {
// Advance the last-entry-seen counter.
highestIndexReceived = chunk.index
done(null, chunk)
})
// The stream provided for consumption by reconnect-core.
client._socketStream = null
// Whether the client has ever successfully connected to the server.
// Affects error event handling.
var everConnected = true
// A UUID-to-function map of callbacks for writes to the log. Used to
// issue callbacks when the server responds with write confirmations.
client._writeCallbacks = {}
// Create a reconnect-core instance for TCP connection to server.
var reconnecter = client._reconnecter = reconnect(function () {
return net.connect(serverOptions)
.setKeepAlive(keepAlive)
.setNoDelay(noDelay)
})(reconnectOptions, function (newSocketStream) {
// Create a stream to filter out entries for reading.
var filterStream = createReadStream(newSocketStream)
// Replace the old duplex stream in the pipeline.
if (client._filterStream) {
client._filterStream.removeAllListeners()
client._filterStream.unpipe()
}
client._filterStream = filterStream
filterStream.pipe(client.readStream, {end: false})
client._socketStream = newSocketStream
// Issue a read request from one past the last-seen index.
proxyEvent(filterStream, 'current')
var readMessage = {from: highestIndexReceived + 1}
newSocketStream.write(JSON.stringify(readMessage) + '\n')
client.emit('ready')
})
.on('error', function (error) {
if (!everConnected) client.emit('error', error)
else {
var code = error.code
if (code === 'EPIPE') failPendingWrites('Server closed the connection.')
else if (code === 'ECONNRESET') return
else if (code === 'ECONNREFUSED') return
else client.emit('error', error)
}
})
proxyEvent(reconnecter, 'disconnect', function () {
if (client.readStream) client.readStream.unpipe()
client.connected = false
failPendingWrites('Disconnected from server.')
})
proxyEvent(reconnecter, 'reconnect', function () {
client.connected = true
})
proxyEvent(reconnecter, 'connect', function (connection) {
everConnected = true
client.connected = true
})
proxyEvent(reconnecter, 'backoff')
proxyEvent(reconnecter, 'fail')
function proxyEvent (emitter, event, optionalCallback) {
emitter.on(event, function () {
if (optionalCallback) optionalCallback.apply(client, arguments)
client.emit(event)
})
)
returned.socket = socket
returned.from = from
socket.write(JSON.stringify({from: from || 1}) + '\n')
return returned
}
function createReadStream (socket) {
var returned = pump(
socket,
ndjson.parse({strict: true}),
through2.obj(function (message, _, done) {
if (message.current === true) returned.emit('current')
else if ('index' in message) {
if ('error' in message) returned.emit('error', message)
// Pass through log entries.
else if ('entry' in message) {
this.push(message)
// Callback for confirmed writes.
} else if ('id' in message) {
var id = message.id
var callback = client._writeCallbacks[id]
delete client._writeCallbacks[id]
callback(null, message.index)
}
}
done()
})
)
return returned
}
function failPendingWrites (message) {
var callbacks = client._writeCallbacks
Object.keys(client._writeCallbacks).forEach(function (id) {
var callback = callbacks[id]
delete callbacks[id]
callback(new Error(message))
})
}
}
function createWriteStream (socket) {
var returned = through2.obj(function (chunk, _, done) {
done(null, {id: uuid(), entry: chunk})
})
returned.socket = socket
pump(returned, ndjson.stringify(), socket)
return returned
inherits(TCPLogClient, EventEmitter)
TCPLogClient.prototype.destroy = function () {
this._reconnecter.reconnect = false
this._reconnecter.disconnect()
this.readStream.end()
}
TCPLogClient.prototype.write = function (entry, callback) {
if (!this.connected) {
throw new Error(
'Cannot write when disconnected. ' +
'Check `client.connected` before calling `client.write()`.'
)
} else {
// Generate a UUID for the write. The server will echo the UUID back to
// confirm the write.
var id = uuid()
this._writeCallbacks[id] = callback || noop
var message = JSON.stringify({id: id, entry: entry}) + '\n'
return this._socketStream.write(message)
}
}
function noop () { }
TCPLogClient.prototype.connect = function () {
this._reconnecter.connect()
return this
}
{
"name": "tcp-log-client",
"description": "sync with a tcp-log-server",
"version": "3.1.0",
"description": "stream and write tcp-log-server entries",
"version": "4.0.0",
"author": "Kyle E. Mitchell <kyle@kemitchell.com> (https://kemitchell.com/)",

@@ -9,2 +9,3 @@ "dependencies": {

"pump": "^1.0.1",
"reconnect-core": "^1.3.0",
"through2": "^2.0.1",

@@ -15,2 +16,3 @@ "uuid": "^2.0.2"

"abstract-blob-store": "^3.2.0",
"async.mapseries": "^0.5.2",
"dev-null": "^0.1.1",

@@ -23,5 +25,13 @@ "level-logs": "^1.1.0",

"standard": "^7.1.2",
"stream-set": "^1.1.0",
"tape": "^4.6.0",
"tcp-log-server": "^6.0.0"
},
"keywords": [
"TCP",
"client",
"database",
"kappa",
"log"
],
"license": "MIT",

@@ -28,0 +38,0 @@ "repository": "kemitchell/tcp-log-client.js",

@@ -1,2 +0,2 @@

Sync with a [tcp-log-server].
Stream and write [tcp-log-server] entries. Reconnect automatically.

@@ -6,9 +6,44 @@ [tcp-log-server]: https://npmjs.com/packages/tcp-log-server

```javascript
var client = new TCPLogClient({port: port})
var TCPLogClient = require('tcp-log-client')
client.on('entry', function (entry, index) { })
var client = new TCPLogClient({
// Use these options for `require('net').connect(options)`.
server: {port: port},
// Enable TCP keepalive. Enabled by default.
keepAlive: true,
// Disable the Nagle algorithm. Disabled by default.
noDelay: true,
// Start reading from entry index 1. 1 by default.
from: 1,
// Stop trying to reconnect and fail after 5 attempts.
reconnect: {failAfter: 5}
})
.on('error', function (error) {
console.error(error)
})
.on('fail', function () {
console.error('Failed to reconnect.')
})
.once('ready', function () {
if (client.connected) {
client.write({example: 'entry'}, function (error, index) {
console.log('New entry index is %d', index)
// Permanently disconnect and end `client.readStream`.
client.destroy()
})
}
})
client.write({a: 1}, function (error, index) { })
// Readable stream of log entries.
// Entries added with `client.write()` will be streamed, too.
client.readStream.on('data', function (chunk) {
console.log(chunk.index)
console.log(chunk.entry)
})
```
client.disconnect()
```
See also:
- [net documentation](https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener) for `options.server`
- [reconnect-core](https://www.npmjs.com/package/reconnect-core) for `options.reconnect`
- [migrate-versioned-log](https://www.npmjs.com/package/migrate-versioned-log) for versioning log entries
var EventEmitter = require('events').EventEmitter
var abs = require('abstract-blob-store')
var client = require('./')
var InMemoryBlobStore = require('abstract-blob-store')
var TCPLogClient = require('./')
var asyncMapSeries = require('async.mapseries')
var devnull = require('dev-null')

@@ -12,6 +13,31 @@ var levelLogs = require('level-logs')

var sha256 = require('sha256')
var streamSet = require('stream-set')
var tape = require('tape')
function startTestServer (callback) {
memdown.clearGlobalStore()
// Use an in-memdown LevelUP storage back-end for testing.
var level = levelup('', {db: memdown})
var handler = logServerHandler(
// Provide a pino logger, but pipe it to nowhere.
pino({}, devnull()),
levelLogs(level, {valueEncoding: 'json'}),
// Use an in-memory blob store for testing.
new InMemoryBlobStore(),
new EventEmitter(),
sha256
)
// Track connections so tests can close to simulate problems.
var connections = streamSet()
net.createServer()
.on('connection', function (socket) { connections.add(socket) })
.on('connection', handler)
.once('close', function () { level.close() })
.listen(0, function () {
callback(this, this.address().port, connections)
})
}
tape('start a test server', function (test) {
withTestServer(function (server, port) {
startTestServer(function (server, port) {
test.pass('started a server')

@@ -23,69 +49,208 @@ server.close()

tape('read and write', function (test) {
withTestServer(function (server, port) {
var readStream = client.createReadStream(socket(port))
.on('data', function (data) {
test.deepEqual(data.entry, {a: 1}, 'event with entry')
test.equal(data.index, 1, 'event with index')
writeStream.end()
readStream.destroy()
var entries = [{a: 1}, {b: 2}, {c: 3}]
tape('read and write from same client', function (test) {
startTestServer(function (server, port) {
var received = []
var client = new TCPLogClient({server: {port: port}})
.connect()
.once('ready', function () {
client.readStream.on('data', function (data) {
received.push(data.entry)
if (received.length === entries.length) {
test.deepEqual(received, entries, 'received entries')
client.destroy()
server.close()
test.end()
}
})
entries.forEach(function (entry) { client.write(entry) })
})
})
})
tape('writes call back with indices', function (test) {
startTestServer(function (server, port) {
var client = new TCPLogClient({server: {port: port}})
.connect()
.once('ready', function () {
var expected = []
var received = []
asyncMapSeries(entries, function (entry, done) {
client.write(entry, function (error, index) {
test.ifError(error)
expected.push({index: index, entry: entry})
})
})
client.readStream.on('data', function (data) {
received.push(data)
if (received.length === expected.length) {
test.deepEqual(received, expected, 'received entries')
client.destroy()
server.close()
test.end()
}
})
})
})
})
tape('read another client\'s writes', function (test) {
startTestServer(function (server, port) {
var options = {server: {port: port}}
var reader = new TCPLogClient(options)
.connect()
.once('ready', function () {
var writer = new TCPLogClient(options)
.connect()
.once('ready', function () {
var received = []
reader.readStream.on('data', function (data) {
received.push(data.entry)
if (received.length === entries.length) {
test.deepEqual(received, entries, 'received entries')
reader.destroy()
writer.destroy()
server.close()
test.end()
}
})
entries.forEach(function (entry) { writer.write(entry) })
})
})
})
})
tape('read another client\'s previous writes', function (test) {
startTestServer(function (server, port) {
var options = {server: {port: port}}
var reader = new TCPLogClient(options)
.connect()
.once('ready', function () {
var writer = new TCPLogClient(options)
.connect()
.once('ready', function () {
entries.forEach(function (entry) { writer.write(entry) })
setImmediate(function () {
var received = []
reader.readStream.on('data', function (data) {
received.push(data.entry)
if (received.length === entries.length) {
test.deepEqual(received, entries, 'received entries')
reader.destroy()
writer.destroy()
server.close()
test.end()
}
})
})
})
})
})
})
tape('current event', function (test) {
startTestServer(function (server, port) {
var options = {server: {port: port}}
var writer = new TCPLogClient(options)
.connect()
.once('ready', function () {
entries.forEach(function (entry) { writer.write(entry) })
var reader = new TCPLogClient(options)
.connect()
.once('ready', function () {
reader.once('current', function (data) {
test.pass('current event emitted')
reader.destroy()
writer.destroy()
server.close()
test.end()
})
})
})
})
})
tape('reconnect', function (test) {
startTestServer(function (server, port, connections) {
var options = {server: {port: port}}
var client = new TCPLogClient(options)
.connect()
.once('ready', function () { destroyAll(connections) })
.once('reconnect', function () {
test.pass('reconnected')
client.destroy()
server.close()
test.end()
})
var writeStream = client.createWriteStream(socket(port))
writeStream.write({a: 1})
})
})
tape('read previous writes', function (test) {
withTestServer(function (server, port) {
var writeStream = client.createWriteStream(socket(port))
var entries = [{a: 1}, {b: 2}, {c: 3}]
tape('read after reconnect', function (test) {
startTestServer(function (server, port, connections) {
var options = {server: {port: port}}
var received = []
var receivedCurrent = false
var receivedEntries = false
var readStream = client.createReadStream(socket(port))
.once('current', function () {
test.pass('current event')
receivedCurrent = true
done()
var client = new TCPLogClient(options)
.connect()
.once('ready', function () {
client.write(entries[0], function (error) {
test.ifError(error, 'no error')
client.once('ready', function () {
client.write(entries[1])
client.write(entries[2])
})
destroyAll(connections)
})
})
.on('data', function (data) {
client.readStream.on('data', function (data) {
received.push(data.entry)
if (received.length === entries.length) {
test.deepEqual(received, entries, 'received entries')
receivedEntries = true
done()
}
})
function done () {
if (receivedEntries && receivedCurrent) {
writeStream.end()
readStream.destroy()
client.destroy()
server.close()
test.end()
}
})
})
})
tape('fail', function (test) {
startTestServer(function (server, port, connections) {
var options = {
server: {port: port},
reconnect: {initialDelay: 1, maxDelay: 2, failAfter: 1}
}
entries.forEach(function (entry) {
writeStream.write(entry)
var client = new TCPLogClient(options)
.connect()
.once('ready', function () {
server.close()
destroyAll(connections)
})
.once('fail', function () {
test.pass('fail')
client.destroy()
server.close()
test.end()
})
})
})
function socket (port) {
return net.connect(port).setKeepAlive(true)
}
tape('destroy ends read stream', function (test) {
startTestServer(function (server, port, connections) {
var options = {server: {port: port}}
var client = new TCPLogClient(options)
.connect()
.once('ready', function () { client.destroy() })
client.readStream.once('finish', function () {
test.pass('stream ended')
server.close()
test.end()
})
})
})
function withTestServer (callback) {
memdown.clearGlobalStore()
var level = levelup('', {db: memdown})
var logs = levelLogs(level, {valueEncoding: 'json'})
var blobs = abs()
var log = pino({}, devnull())
var emitter = new EventEmitter()
var handler = logServerHandler(log, logs, blobs, emitter, sha256)
var server = net.createServer()
.on('connection', handler)
.once('close', function () { level.close() })
.listen(0, function () { callback(server, this.address().port) })
function destroyAll (connections) {
connections.forEach(function (connection) {
connection.destroy()
})
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc