Comparing version 6.0.0-alpha.13 to 6.0.0-alpha.14
@@ -1,3 +0,1 @@ | ||
/* jshint browserify: true */ | ||
'use strict'; | ||
@@ -11,25 +9,8 @@ | ||
var types = require('../../lib/types'); | ||
let types = require('../../lib/types'); | ||
/** Basic parse method, only supporting JSON parsing. */ | ||
function parse(any, opts) { | ||
var schema; | ||
if (typeof any == 'string') { | ||
try { | ||
schema = JSON.parse(any); | ||
} catch (err) { | ||
schema = any; | ||
} | ||
} else { | ||
schema = any; | ||
} | ||
return types.Type.forSchema(schema, opts); | ||
} | ||
module.exports = { | ||
Type: types.Type, | ||
parse: parse, | ||
types: types.builtins | ||
types: types.builtins, | ||
}; |
@@ -1,3 +0,1 @@ | ||
/* jshint browser: true, node: true */ | ||
'use strict'; | ||
@@ -12,59 +10,58 @@ | ||
var avroServices = require('./avsc-services'), | ||
containers = require('../../lib/containers'), | ||
utils = require('../../lib/utils'), | ||
stream = require('stream'), | ||
util = require('util'); | ||
let containers = require('../../lib/containers'), | ||
types = require('../../lib/types'), | ||
stream = require('stream'); | ||
/** Transform stream which lazily reads a blob's contents. */ | ||
function BlobReader(blob, opts) { | ||
stream.Readable.call(this); | ||
opts = opts || {}; | ||
class BlobReader extends stream.Readable { | ||
constructor (blob, opts) { | ||
super(); | ||
opts = opts || {}; | ||
this._batchSize = opts.batchSize || 65536; | ||
this._blob = blob; | ||
this._pos = 0; | ||
} | ||
util.inherits(BlobReader, stream.Readable); | ||
BlobReader.prototype._read = function () { | ||
var pos = this._pos; | ||
if (pos >= this._blob.size) { | ||
this.push(null); | ||
return; | ||
this._batchSize = opts.batchSize || 65536; | ||
this._blob = blob; | ||
this._pos = 0; | ||
} | ||
this._pos += this._batchSize; | ||
var blob = this._blob.slice(pos, this._pos, this._blob.type); | ||
var reader = new FileReader(); | ||
var self = this; | ||
reader.addEventListener('loadend', function cb(evt) { | ||
reader.removeEventListener('loadend', cb, false); | ||
if (evt.error) { | ||
self.emit('error', evt.error); | ||
} else { | ||
self.push(utils.bufferFrom(reader.result)); | ||
_read () { | ||
let pos = this._pos; | ||
if (pos >= this._blob.size) { | ||
this.push(null); | ||
return; | ||
} | ||
}, false); | ||
reader.readAsArrayBuffer(blob); | ||
}; | ||
/** Transform stream which builds a blob from all data written to it. */ | ||
function BlobWriter() { | ||
stream.Transform.call(this, {readableObjectMode: true}); | ||
this._bufs = []; | ||
this._pos += this._batchSize; | ||
let blob = this._blob.slice(pos, this._pos, this._blob.type); | ||
let reader = new FileReader(); | ||
let self = this; | ||
reader.addEventListener('loadend', function cb(evt) { | ||
reader.removeEventListener('loadend', cb, false); | ||
if (evt.error) { | ||
self.emit('error', evt.error); | ||
} else { | ||
self.push(reader.result); | ||
} | ||
}, false); | ||
reader.readAsArrayBuffer(blob); | ||
} | ||
} | ||
util.inherits(BlobWriter, stream.Transform); | ||
BlobWriter.prototype._transform = function (buf, encoding, cb) { | ||
this._bufs.push(buf); | ||
cb(); | ||
}; | ||
/** Transform stream which builds a blob from all data written to it. */ | ||
class BlobWriter extends stream.Transform { | ||
constructor () { | ||
super({readableObjectMode: true}); | ||
this._bufs = []; | ||
} | ||
BlobWriter.prototype._flush = function (cb) { | ||
this.push(new Blob(this._bufs, {type: 'application/octet-binary'})); | ||
cb(); | ||
}; | ||
_transform (buf, encoding, cb) { | ||
this._bufs.push(buf); | ||
cb(); | ||
} | ||
_flush (cb) { | ||
this.push(new Blob(this._bufs, {type: 'application/octet-binary'})); | ||
cb(); | ||
} | ||
} | ||
/** Read an Avro-container stored as a blob. */ | ||
@@ -81,4 +78,4 @@ function createBlobDecoder(blob, opts) { | ||
function createBlobEncoder(schema, opts) { | ||
var encoder = new containers.streams.BlockEncoder(schema, opts); | ||
var builder = new BlobWriter(); | ||
let encoder = new containers.streams.BlockEncoder(schema, opts); | ||
let builder = new BlobWriter(); | ||
encoder.pipe(builder); | ||
@@ -91,3 +88,3 @@ return new stream.Duplex({ | ||
// It's also likely impractical to create very large blobs. | ||
var val = builder.read(); | ||
let val = builder.read(); | ||
if (val) { | ||
@@ -98,3 +95,3 @@ done(val); | ||
} | ||
var self = this; | ||
let self = this; | ||
function done(val) { | ||
@@ -108,3 +105,3 @@ self.push(val || builder.read()); | ||
} | ||
}).on('finish', function () { encoder.end(); }); | ||
}).on('finish', () => { encoder.end(); }); | ||
} | ||
@@ -114,7 +111,7 @@ | ||
module.exports = { | ||
createBlobDecoder: createBlobDecoder, | ||
createBlobEncoder: createBlobEncoder, | ||
streams: containers.streams | ||
createBlobDecoder, | ||
createBlobEncoder, | ||
streams: containers.streams, | ||
Type: types.Type, | ||
types: types.builtins, | ||
}; | ||
utils.copyOwnProperties(avroServices, module.exports); |
@@ -1,3 +0,1 @@ | ||
/* jshint node: true */ | ||
'use strict'; | ||
@@ -10,3 +8,3 @@ | ||
function createImportHook() { | ||
return function (fpath, kind, cb) { cb(createError()); }; | ||
return function (_, cb) { cb(createError()); }; | ||
} | ||
@@ -18,8 +16,8 @@ | ||
function tryReadFileSync() { return null; } | ||
module.exports = { | ||
createImportHook: createImportHook, | ||
createSyncImportHook: createSyncImportHook, | ||
existsSync: function () { return false; }, | ||
readFileSync: function () { throw createError(); } | ||
createImportHook, | ||
createSyncImportHook, | ||
tryReadFileSync, | ||
}; |
@@ -1,3 +0,1 @@ | ||
/* jshint node: true */ | ||
// TODO: Add streams which prefix each record with its length. | ||
@@ -16,16 +14,16 @@ | ||
var types = require('./types'), | ||
let types = require('./types'), | ||
utils = require('./utils'), | ||
stream = require('stream'), | ||
util = require('util'), | ||
zlib = require('zlib'); | ||
stream = require('stream'); | ||
const DECODER = new TextDecoder(); | ||
const ENCODER = new TextEncoder(); | ||
var OPTS = {namespace: 'org.apache.avro.file'}; | ||
let OPTS = {namespace: 'org.apache.avro.file', registry: {}}; | ||
var LONG_TYPE = types.Type.forSchema('long', OPTS); | ||
let LONG_TYPE = types.Type.forSchema('long', OPTS); | ||
var MAP_BYTES_TYPE = types.Type.forSchema({type: 'map', values: 'bytes'}, OPTS); | ||
let MAP_BYTES_TYPE = types.Type.forSchema({type: 'map', values: 'bytes'}, OPTS); | ||
var HEADER_TYPE = types.Type.forSchema({ | ||
let HEADER_TYPE = types.Type.forSchema({ | ||
name: 'Header', | ||
@@ -40,3 +38,3 @@ type: 'record', | ||
var BLOCK_TYPE = types.Type.forSchema({ | ||
let BLOCK_TYPE = types.Type.forSchema({ | ||
name: 'Block', | ||
@@ -52,306 +50,327 @@ type: 'record', | ||
// First 4 bytes of an Avro object container file. | ||
var MAGIC_BYTES = utils.bufferFrom('Obj\x01'); | ||
let MAGIC_BYTES = ENCODER.encode('Obj\x01'); | ||
// Convenience. | ||
var f = util.format; | ||
var Tap = utils.Tap; | ||
let Tap = utils.Tap; | ||
/** Duplex stream for decoding fragments. */ | ||
function RawDecoder(schema, opts) { | ||
opts = opts || {}; | ||
class RawDecoder extends stream.Duplex { | ||
constructor (schema, opts) { | ||
opts = opts || {}; | ||
var noDecode = !!opts.noDecode; | ||
stream.Duplex.call(this, { | ||
readableObjectMode: !noDecode, | ||
allowHalfOpen: false | ||
}); | ||
let noDecode = !!opts.noDecode; | ||
super({ | ||
readableObjectMode: !noDecode, | ||
allowHalfOpen: false | ||
}); | ||
this._type = types.Type.forSchema(schema); | ||
this._tap = new Tap(utils.newBuffer(0)); | ||
this._writeCb = null; | ||
this._needPush = false; | ||
this._readValue = createReader(noDecode, this._type); | ||
this._finished = false; | ||
this._type = types.Type.forSchema(schema); | ||
this._tap = Tap.withCapacity(0); | ||
this._writeCb = null; | ||
this._needPush = false; | ||
this._readValue = createReader(noDecode, this._type); | ||
this._finished = false; | ||
this.on('finish', function () { | ||
this._finished = true; | ||
this._read(); | ||
}); | ||
} | ||
util.inherits(RawDecoder, stream.Duplex); | ||
RawDecoder.prototype._write = function (chunk, encoding, cb) { | ||
// Store the write callback and call it when we are done decoding all records | ||
// in this chunk. If we call it right away, we risk loading the entire input | ||
// in memory. We only need to store the latest callback since the stream API | ||
// guarantees that `_write` won't be called again until we call the previous. | ||
this._writeCb = cb; | ||
var tap = this._tap; | ||
tap.buf = Buffer.concat([tap.buf.slice(tap.pos), chunk]); | ||
tap.pos = 0; | ||
if (this._needPush) { | ||
this._needPush = false; | ||
this._read(); | ||
this.on('finish', function () { | ||
this._finished = true; | ||
this._read(); | ||
}); | ||
} | ||
}; | ||
RawDecoder.prototype._read = function () { | ||
this._needPush = false; | ||
_write (chunk, encoding, cb) { | ||
// Store the write callback and call it when we are done decoding all | ||
// records in this chunk. If we call it right away, we risk loading the | ||
// entire input in memory. We only need to store the latest callback since | ||
// the stream API guarantees that `_write` won't be called again until we | ||
// call the previous. | ||
this._writeCb = cb; | ||
var tap = this._tap; | ||
var pos = tap.pos; | ||
var val = this._readValue(tap); | ||
if (tap.isValid()) { | ||
this.push(val); | ||
} else if (!this._finished) { | ||
tap.pos = pos; | ||
this._needPush = true; | ||
if (this._writeCb) { | ||
// This should only ever be false on the first read, and only if it | ||
// happens before the first write. | ||
this._writeCb(); | ||
let tap = this._tap; | ||
tap.forward(chunk); | ||
if (this._needPush) { | ||
this._needPush = false; | ||
this._read(); | ||
} | ||
} else { | ||
this.push(null); | ||
} | ||
}; | ||
_read () { | ||
this._needPush = false; | ||
/** Duplex stream for decoding object container files. */ | ||
function BlockDecoder(opts) { | ||
opts = opts || {}; | ||
var noDecode = !!opts.noDecode; | ||
stream.Duplex.call(this, { | ||
allowHalfOpen: true, // For async decompressors. | ||
readableObjectMode: !noDecode | ||
}); | ||
this._rType = opts.readerSchema !== undefined ? | ||
types.Type.forSchema(opts.readerSchema) : | ||
undefined; | ||
this._wType = null; | ||
this._codecs = opts.codecs; | ||
this._codec = undefined; | ||
this._parseHook = opts.parseHook; | ||
this._tap = new Tap(utils.newBuffer(0)); | ||
this._blockTap = new Tap(utils.newBuffer(0)); | ||
this._syncMarker = null; | ||
this._readValue = null; | ||
this._noDecode = noDecode; | ||
this._queue = new utils.OrderedQueue(); | ||
this._decompress = null; // Decompression function. | ||
this._index = 0; // Next block index. | ||
this._remaining = undefined; // In the current block. | ||
this._needPush = false; | ||
this._finished = false; | ||
this.on('finish', function () { | ||
this._finished = true; | ||
if (this._needPush) { | ||
this._read(); | ||
let tap = this._tap; | ||
let pos = tap.pos; | ||
let val = this._readValue(tap); | ||
if (tap.isValid()) { | ||
this.push(val); | ||
} else if (!this._finished) { | ||
tap.pos = pos; | ||
this._needPush = true; | ||
if (this._writeCb) { | ||
// This should only ever be false on the first read, and only if it | ||
// happens before the first write. | ||
this._writeCb(); | ||
} | ||
} else { | ||
this.push(null); | ||
} | ||
}); | ||
} | ||
} | ||
util.inherits(BlockDecoder, stream.Duplex); | ||
BlockDecoder.defaultCodecs = function () { | ||
return { | ||
'null': function (buf, cb) { cb(null, buf); }, | ||
'deflate': zlib.inflateRaw | ||
}; | ||
}; | ||
/** Duplex stream for decoding object container files. */ | ||
class BlockDecoder extends stream.Duplex { | ||
constructor (opts) { | ||
opts = opts || {}; | ||
BlockDecoder.getDefaultCodecs = BlockDecoder.defaultCodecs; | ||
let noDecode = !!opts.noDecode; | ||
super({ | ||
allowHalfOpen: true, // For async decompressors. | ||
readableObjectMode: !noDecode | ||
}); | ||
BlockDecoder.prototype._decodeHeader = function () { | ||
var tap = this._tap; | ||
if (tap.buf.length < MAGIC_BYTES.length) { | ||
// Wait until more data arrives. | ||
return false; | ||
} | ||
this._rType = opts.readerSchema !== undefined ? | ||
types.Type.forSchema(opts.readerSchema) : | ||
undefined; | ||
this._wType = null; | ||
this._codecs = opts.codecs; | ||
this._codec = undefined; | ||
this._parseHook = opts.parseHook; | ||
this._tap = Tap.withCapacity(0); | ||
this._blockTap = Tap.withCapacity(0); | ||
this._syncMarker = null; | ||
this._readValue = null; | ||
this._noDecode = noDecode; | ||
this._queue = new utils.OrderedQueue(); | ||
this._decompress = null; // Decompression function. | ||
this._index = 0; // Next block index. | ||
this._remaining = undefined; // In the current block. | ||
this._needPush = false; | ||
this._finished = false; | ||
if (!MAGIC_BYTES.equals(tap.buf.slice(0, MAGIC_BYTES.length))) { | ||
this.emit('error', new Error('invalid magic bytes')); | ||
return false; | ||
this.on('finish', function () { | ||
this._finished = true; | ||
if (this._needPush) { | ||
this._read(); | ||
} | ||
}); | ||
} | ||
var header = HEADER_TYPE._read(tap); | ||
if (!tap.isValid()) { | ||
return false; | ||
static defaultCodecs () { | ||
return { | ||
'null': function (buf, cb) { cb(null, buf); } | ||
}; | ||
} | ||
this._codec = (header.meta['avro.codec'] || 'null').toString(); | ||
var codecs = this._codecs || BlockDecoder.getDefaultCodecs(); | ||
this._decompress = codecs[this._codec]; | ||
if (!this._decompress) { | ||
this.emit('error', new Error(f('unknown codec: %s', this._codec))); | ||
return; | ||
static getDefaultCodecs () { | ||
return BlockDecoder.defaultCodecs(); | ||
} | ||
try { | ||
var schema = JSON.parse(header.meta['avro.schema'].toString()); | ||
if (this._parseHook) { | ||
schema = this._parseHook(schema); | ||
_decodeHeader () { | ||
let tap = this._tap; | ||
if (tap.length < MAGIC_BYTES.length) { | ||
// Wait until more data arrives. | ||
return false; | ||
} | ||
this._wType = types.Type.forSchema(schema); | ||
} catch (err) { | ||
this.emit('error', err); | ||
return; | ||
} | ||
this._readValue = createReader(this._noDecode, this._wType, this._rType); | ||
this._syncMarker = header.sync; | ||
this.emit('metadata', this._wType, this._codec, header); | ||
return true; | ||
}; | ||
if (!utils.bufEqual( | ||
MAGIC_BYTES, | ||
tap.subarray(0, MAGIC_BYTES.length) | ||
)) { | ||
this.emit('error', new Error('invalid magic bytes')); | ||
return false; | ||
} | ||
BlockDecoder.prototype._write = function (chunk, encoding, cb) { | ||
var tap = this._tap; | ||
tap.buf = Buffer.concat([tap.buf, chunk]); | ||
tap.pos = 0; | ||
let header = HEADER_TYPE._read(tap); | ||
if (!tap.isValid()) { | ||
return false; | ||
} | ||
if (!this._decodeHeader()) { | ||
process.nextTick(cb); | ||
return; | ||
} | ||
this._codec = DECODER.decode(header.meta['avro.codec']) || 'null'; | ||
let codecs = this._codecs || BlockDecoder.getDefaultCodecs(); | ||
this._decompress = codecs[this._codec]; | ||
if (!this._decompress) { | ||
this.emit('error', new Error(`unknown codec: ${this._codec}`)); | ||
return; | ||
} | ||
// We got the header, switch to block decoding mode. Also, call it directly | ||
// in case we already have all the data (in which case `_write` wouldn't get | ||
// called anymore). | ||
this._write = this._writeChunk; | ||
this._write(utils.newBuffer(0), encoding, cb); | ||
}; | ||
try { | ||
let schema = JSON.parse(DECODER.decode(header.meta['avro.schema'])); | ||
if (this._parseHook) { | ||
schema = this._parseHook(schema); | ||
} | ||
this._wType = types.Type.forSchema(schema); | ||
} catch (err) { | ||
this.emit('error', err); | ||
return; | ||
} | ||
BlockDecoder.prototype._writeChunk = function (chunk, encoding, cb) { | ||
var tap = this._tap; | ||
tap.buf = Buffer.concat([tap.buf.slice(tap.pos), chunk]); | ||
tap.pos = 0; | ||
var nBlocks = 1; | ||
var block; | ||
while ((block = tryReadBlock(tap))) { | ||
if (!this._syncMarker.equals(block.sync)) { | ||
this.emit('error', new Error('invalid sync marker')); | ||
try { | ||
this._readValue = createReader(this._noDecode, this._wType, this._rType); | ||
} catch (err) { | ||
this.emit('error', err); | ||
return; | ||
} | ||
nBlocks++; | ||
this._decompress( | ||
block.data, | ||
this._createBlockCallback(block.count, chunkCb) | ||
); | ||
this._syncMarker = header.sync; | ||
this.emit('metadata', this._wType, this._codec, header); | ||
return true; | ||
} | ||
chunkCb(); | ||
function chunkCb() { | ||
if (!--nBlocks) { | ||
cb(); | ||
_write (chunk, encoding, cb) { | ||
let tap = this._tap; | ||
tap.append(chunk); | ||
if (!this._decodeHeader()) { | ||
process.nextTick(cb); | ||
return; | ||
} | ||
// We got the header, switch to block decoding mode. Also, call it directly | ||
// in case we already have all the data (in which case `_write` wouldn't get | ||
// called anymore). | ||
this._write = this._writeChunk; | ||
this._write(new Uint8Array(0), encoding, cb); | ||
} | ||
}; | ||
BlockDecoder.prototype._createBlockCallback = function (count, cb) { | ||
var self = this; | ||
var index = this._index++; | ||
_writeChunk (chunk, encoding, cb) { | ||
let tap = this._tap; | ||
tap.forward(chunk); | ||
return function (cause, data) { | ||
if (cause) { | ||
var err = new Error(f('%s codec decompression error', self._codec)); | ||
err.cause = cause; | ||
self.emit('error', err); | ||
cb(); | ||
} else { | ||
self._queue.push(new BlockData(index, data, cb, count)); | ||
if (self._needPush) { | ||
self._read(); | ||
let nBlocks = 1; | ||
let block; | ||
while ((block = tryReadBlock(tap))) { | ||
if (!utils.bufEqual(this._syncMarker, block.sync)) { | ||
this.emit('error', new Error('invalid sync marker')); | ||
return; | ||
} | ||
nBlocks++; | ||
this._decompress( | ||
block.data, | ||
this._createBlockCallback(block.data.length, block.count, chunkCb) | ||
); | ||
} | ||
}; | ||
}; | ||
chunkCb(); | ||
BlockDecoder.prototype._read = function () { | ||
this._needPush = false; | ||
var tap = this._blockTap; | ||
if (!this._remaining) { | ||
var data = this._queue.pop(); | ||
if (!data || !data.count) { | ||
if (this._finished) { | ||
this.push(null); | ||
} else { | ||
this._needPush = true; | ||
function chunkCb() { | ||
if (!--nBlocks) { | ||
cb(); | ||
} | ||
if (data) { | ||
data.cb(); | ||
} | ||
return; // Wait for more data. | ||
} | ||
data.cb(); | ||
this._remaining = data.count; | ||
tap.buf = data.buf; | ||
tap.pos = 0; | ||
} | ||
this._remaining--; | ||
this.push(this._readValue(tap)); // The read is guaranteed valid. | ||
}; | ||
_createBlockCallback (size, count, cb) { | ||
let self = this; | ||
let index = this._index++; | ||
return function (cause, data) { | ||
if (cause) { | ||
let err = new Error(`${self._codec} codec decompression error`); | ||
err.cause = cause; | ||
self.emit('error', err); | ||
cb(); | ||
} else { | ||
self.emit('block', new BlockInfo(count, data.length, size)); | ||
self._queue.push(new BlockData(index, data, cb, count)); | ||
if (self._needPush) { | ||
self._read(); | ||
} | ||
} | ||
}; | ||
} | ||
/** Duplex stream for encoding. */ | ||
function RawEncoder(schema, opts) { | ||
opts = opts || {}; | ||
_read () { | ||
this._needPush = false; | ||
stream.Transform.call(this, { | ||
writableObjectMode: true, | ||
allowHalfOpen: false | ||
}); | ||
let tap = this._blockTap; | ||
if (!this._remaining) { | ||
let data = this._queue.pop(); | ||
if (!data || !data.count) { | ||
if (this._finished) { | ||
this.push(null); | ||
} else { | ||
this._needPush = true; | ||
} | ||
if (data) { | ||
data.cb(); | ||
} | ||
return; // Wait for more data. | ||
} | ||
data.cb(); | ||
this._remaining = data.count; | ||
tap.setData(data.buf); | ||
} | ||
this._type = types.Type.forSchema(schema); | ||
this._writeValue = function (tap, val) { | ||
this._remaining--; | ||
let val; | ||
try { | ||
this._type._write(tap, val); | ||
val = this._readValue(tap); | ||
if (!tap.isValid()) { | ||
throw new Error('truncated block'); | ||
} | ||
} catch (err) { | ||
this.emit('error', err); | ||
this._remaining = 0; | ||
this.emit('error', err); // Corrupt data. | ||
return; | ||
} | ||
}; | ||
this._tap = new Tap(utils.newBuffer(opts.batchSize || 65536)); | ||
this.push(val); | ||
} | ||
} | ||
util.inherits(RawEncoder, stream.Transform); | ||
RawEncoder.prototype._transform = function (val, encoding, cb) { | ||
var tap = this._tap; | ||
var buf = tap.buf; | ||
var pos = tap.pos; | ||
this._writeValue(tap, val); | ||
if (!tap.isValid()) { | ||
if (pos) { | ||
// Emit any valid data. | ||
this.push(copyBuffer(tap.buf, 0, pos)); | ||
} | ||
var len = tap.pos - pos; | ||
if (len > buf.length) { | ||
// Not enough space for last written object, need to resize. | ||
tap.buf = utils.newBuffer(2 * len); | ||
} | ||
tap.pos = 0; | ||
this._writeValue(tap, val); // Rewrite last failed write. | ||
/** Duplex stream for encoding. */ | ||
class RawEncoder extends stream.Transform { | ||
constructor (schema, opts) { | ||
opts = opts || {}; | ||
super({ | ||
writableObjectMode: true, | ||
allowHalfOpen: false | ||
}); | ||
this._type = types.Type.forSchema(schema); | ||
this._writeValue = function (tap, val) { | ||
try { | ||
this._type._write(tap, val); | ||
} catch (err) { | ||
this.emit('typeError', err, val, this._type); | ||
} | ||
}; | ||
this._tap = Tap.withCapacity(opts.batchSize || 65536); | ||
this.on('typeError', function (err) { this.emit('error', err); }); | ||
} | ||
cb(); | ||
}; | ||
_transform (val, encoding, cb) { | ||
let tap = this._tap; | ||
let pos = tap.pos; | ||
RawEncoder.prototype._flush = function (cb) { | ||
var tap = this._tap; | ||
var pos = tap.pos; | ||
if (pos) { | ||
// This should only ever be false if nothing is written to the stream. | ||
this.push(tap.buf.slice(0, pos)); | ||
this._writeValue(tap, val); | ||
if (!tap.isValid()) { | ||
if (pos) { | ||
// Emit any valid data. | ||
this.push(tap.toBuffer()); | ||
} | ||
let len = tap.pos - pos; | ||
if (len > tap.length) { | ||
// Not enough space for last written object, need to resize. | ||
tap.reinitialize(2 * len); | ||
} | ||
tap.pos = 0; | ||
this._writeValue(tap, val); // Rewrite last failed write. | ||
} | ||
cb(); | ||
} | ||
cb(); | ||
}; | ||
_flush (cb) { | ||
let tap = this._tap; | ||
let pos = tap.pos; | ||
if (pos) { | ||
// This should only ever be false if nothing is written to the stream. | ||
this.push(tap.subarray(0, pos)); | ||
} | ||
cb(); | ||
} | ||
} | ||
/** | ||
@@ -370,188 +389,213 @@ * Duplex stream to write object container files. | ||
*/ | ||
function BlockEncoder(schema, opts) { | ||
opts = opts || {}; | ||
class BlockEncoder extends stream.Duplex { | ||
constructor (schema, opts) { | ||
opts = opts || {}; | ||
stream.Duplex.call(this, { | ||
allowHalfOpen: true, // To support async compressors. | ||
writableObjectMode: true | ||
}); | ||
super({ | ||
allowHalfOpen: true, // To support async compressors. | ||
writableObjectMode: true | ||
}); | ||
var type; | ||
if (types.Type.isType(schema)) { | ||
type = schema; | ||
schema = undefined; | ||
} else { | ||
// Keep full schema to be able to write it to the header later. | ||
type = types.Type.forSchema(schema); | ||
} | ||
let type; | ||
if (types.Type.isType(schema)) { | ||
type = schema; | ||
schema = undefined; | ||
} else { | ||
// Keep full schema to be able to write it to the header later. | ||
type = types.Type.forSchema(schema); | ||
} | ||
this._schema = schema; | ||
this._type = type; | ||
this._writeValue = function (tap, val) { | ||
try { | ||
this._type._write(tap, val); | ||
} catch (err) { | ||
this.emit('error', err); | ||
this._schema = schema; | ||
this._type = type; | ||
this._writeValue = function (tap, val) { | ||
try { | ||
this._type._write(tap, val); | ||
} catch (err) { | ||
this.emit('typeError', err, val, this._type); | ||
return false; | ||
} | ||
return true; | ||
}; | ||
this._blockSize = opts.blockSize || 65536; | ||
this._tap = Tap.withCapacity(this._blockSize); | ||
this._codecs = opts.codecs; | ||
this._codec = opts.codec || 'null'; | ||
this._blockCount = 0; | ||
this._syncMarker = opts.syncMarker || new utils.Lcg().nextBuffer(16); | ||
this._queue = new utils.OrderedQueue(); | ||
this._pending = 0; | ||
this._finished = false; | ||
this._needHeader = false; | ||
this._needPush = false; | ||
this._metadata = opts.metadata || {}; | ||
if (!MAP_BYTES_TYPE.isValid(this._metadata)) { | ||
throw new Error('invalid metadata'); | ||
} | ||
}; | ||
this._blockSize = opts.blockSize || 65536; | ||
this._tap = new Tap(utils.newBuffer(this._blockSize)); | ||
this._codecs = opts.codecs; | ||
this._codec = opts.codec || 'null'; | ||
this._blockCount = 0; | ||
this._syncMarker = opts.syncMarker || new utils.Lcg().nextBuffer(16); | ||
this._queue = new utils.OrderedQueue(); | ||
this._pending = 0; | ||
this._finished = false; | ||
this._needHeader = false; | ||
this._needPush = false; | ||
this._metadata = opts.metadata || {}; | ||
if (!MAP_BYTES_TYPE.isValid(this._metadata)) { | ||
throw new Error('invalid metadata'); | ||
let codec = this._codec; | ||
this._compress = (this._codecs || BlockEncoder.getDefaultCodecs())[codec]; | ||
if (!this._compress) { | ||
throw new Error(`unsupported codec: ${codec}`); | ||
} | ||
if (opts.omitHeader !== undefined) { // Legacy option. | ||
opts.writeHeader = opts.omitHeader ? 'never' : 'auto'; | ||
} | ||
switch (opts.writeHeader) { | ||
case false: | ||
case 'never': | ||
break; | ||
// Backwards-compatibility (eager default would be better). | ||
case undefined: | ||
case 'auto': | ||
this._needHeader = true; | ||
break; | ||
default: | ||
this._writeHeader(); | ||
} | ||
this.on('finish', function () { | ||
this._finished = true; | ||
if (this._blockCount) { | ||
this._flushChunk(); | ||
} else if (this._finished && this._needPush) { | ||
// We don't need to check `_isPending` since `_blockCount` is always | ||
// positive after the first flush. | ||
this.push(null); | ||
} | ||
}); | ||
this.on('typeError', function (err) { this.emit('error', err); }); | ||
} | ||
var codec = this._codec; | ||
this._compress = (this._codecs || BlockEncoder.getDefaultCodecs())[codec]; | ||
if (!this._compress) { | ||
throw new Error(f('unsupported codec: %s', codec)); | ||
static defaultCodecs () { | ||
return { | ||
'null': function (buf, cb) { cb(null, buf); } | ||
}; | ||
} | ||
if (opts.omitHeader !== undefined) { // Legacy option. | ||
opts.writeHeader = opts.omitHeader ? 'never' : 'auto'; | ||
static getDefaultCodecs () { | ||
return BlockEncoder.defaultCodecs(); | ||
} | ||
switch (opts.writeHeader) { | ||
case false: | ||
case 'never': | ||
break; | ||
case undefined: // Backwards-compatibility (eager default would be better). | ||
case 'auto': | ||
this._needHeader = true; | ||
break; | ||
default: | ||
this._writeHeader(); | ||
_writeHeader () { | ||
let schema = JSON.stringify( | ||
this._schema ? this._schema : this._type.getSchema({exportAttrs: true}) | ||
); | ||
let meta = utils.copyOwnProperties( | ||
this._metadata, | ||
{ | ||
'avro.schema': ENCODER.encode(schema), | ||
'avro.codec': ENCODER.encode(this._codec) | ||
}, | ||
true // Overwrite. | ||
); | ||
let Header = HEADER_TYPE.getRecordConstructor(); | ||
let header = new Header(MAGIC_BYTES, meta, this._syncMarker); | ||
this.push(header.toBuffer()); | ||
} | ||
this.on('finish', function () { | ||
this._finished = true; | ||
if (this._blockCount) { | ||
this._flushChunk(); | ||
} else if (this._finished && this._needPush) { | ||
// We don't need to check `_isPending` since `_blockCount` is always | ||
// positive after the first flush. | ||
this.push(null); | ||
_write (val, encoding, cb) { | ||
if (this._needHeader) { | ||
this._writeHeader(); | ||
this._needHeader = false; | ||
} | ||
}); | ||
} | ||
util.inherits(BlockEncoder, stream.Duplex); | ||
BlockEncoder.defaultCodecs = function () { | ||
return { | ||
'null': function (buf, cb) { cb(null, buf); }, | ||
'deflate': zlib.deflateRaw | ||
}; | ||
}; | ||
let tap = this._tap; | ||
let pos = tap.pos; | ||
let flushing = false; | ||
BlockEncoder.getDefaultCodecs = BlockEncoder.defaultCodecs; | ||
if (this._writeValue(tap, val)) { | ||
if (!tap.isValid()) { | ||
if (pos) { | ||
this._flushChunk(pos, cb); | ||
flushing = true; | ||
} | ||
let len = tap.pos - pos; | ||
if (len > this._blockSize) { | ||
// Not enough space for last written object, need to resize. | ||
this._blockSize = len * 2; | ||
} | ||
tap.reinitialize(this._blockSize); | ||
this._writeValue(tap, val); // Rewrite last failed write. | ||
} | ||
this._blockCount++; | ||
} else { | ||
tap.pos = pos; | ||
} | ||
BlockEncoder.prototype._writeHeader = function () { | ||
var schema = JSON.stringify( | ||
this._schema ? this._schema : this._type.schema({exportAttrs: true}) | ||
); | ||
var meta = utils.copyOwnProperties( | ||
this._metadata, | ||
{'avro.schema': utils.bufferFrom(schema), 'avro.codec': utils.bufferFrom(this._codec)}, | ||
true // Overwrite. | ||
); | ||
var Header = HEADER_TYPE.recordConstructor; | ||
var header = new Header(MAGIC_BYTES, meta, this._syncMarker); | ||
this.push(header.toBuffer()); | ||
}; | ||
if (!flushing) { | ||
cb(); | ||
} | ||
} | ||
BlockEncoder.prototype._write = function (val, encoding, cb) { | ||
if (this._needHeader) { | ||
this._writeHeader(); | ||
this._needHeader = false; | ||
_flushChunk (pos, cb) { | ||
let tap = this._tap; | ||
pos = pos || tap.pos; | ||
this._compress( | ||
tap.subarray(0, pos), | ||
this._createBlockCallback(pos, cb) | ||
); | ||
this._blockCount = 0; | ||
} | ||
var tap = this._tap; | ||
var pos = tap.pos; | ||
var flushing = false; | ||
this._writeValue(tap, val); | ||
if (!tap.isValid()) { | ||
if (pos) { | ||
this._flushChunk(pos, cb); | ||
flushing = true; | ||
_read () { | ||
let self = this; | ||
let data = this._queue.pop(); | ||
if (!data) { | ||
if (this._finished && !this._pending) { | ||
process.nextTick(() => { self.push(null); }); | ||
} else { | ||
this._needPush = true; | ||
} | ||
return; | ||
} | ||
var len = tap.pos - pos; | ||
if (len > this._blockSize) { | ||
// Not enough space for last written object, need to resize. | ||
this._blockSize = len * 2; | ||
} | ||
tap.buf = utils.newBuffer(this._blockSize); | ||
tap.pos = 0; | ||
this._writeValue(tap, val); // Rewrite last failed write. | ||
} | ||
this._blockCount++; | ||
if (!flushing) { | ||
cb(); | ||
} | ||
}; | ||
this.push(LONG_TYPE.toBuffer(data.count, true)); | ||
this.push(LONG_TYPE.toBuffer(data.buf.length, true)); | ||
this.push(data.buf); | ||
this.push(this._syncMarker); | ||
BlockEncoder.prototype._flushChunk = function (pos, cb) { | ||
var tap = this._tap; | ||
pos = pos || tap.pos; | ||
this._compress(tap.buf.slice(0, pos), this._createBlockCallback(cb)); | ||
this._blockCount = 0; | ||
}; | ||
BlockEncoder.prototype._read = function () { | ||
var self = this; | ||
var data = this._queue.pop(); | ||
if (!data) { | ||
if (this._finished && !this._pending) { | ||
process.nextTick(function () { self.push(null); }); | ||
} else { | ||
this._needPush = true; | ||
if (!this._finished) { | ||
data.cb(); | ||
} | ||
return; | ||
} | ||
this.push(LONG_TYPE.toBuffer(data.count, true)); | ||
this.push(LONG_TYPE.toBuffer(data.buf.length, true)); | ||
this.push(data.buf); | ||
this.push(this._syncMarker); | ||
_createBlockCallback (size, cb) { | ||
let self = this; | ||
let index = this._index++; | ||
let count = this._blockCount; | ||
this._pending++; | ||
if (!this._finished) { | ||
data.cb(); | ||
return function (cause, data) { | ||
if (cause) { | ||
let err = new Error(`${self._codec} codec compression error`); | ||
err.cause = cause; | ||
self.emit('error', err); | ||
return; | ||
} | ||
self._pending--; | ||
self.emit('block', new BlockInfo(count, size, data.length)); | ||
self._queue.push(new BlockData(index, data, cb, count)); | ||
if (self._needPush) { | ||
self._needPush = false; | ||
self._read(); | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
BlockEncoder.prototype._createBlockCallback = function (cb) { | ||
var self = this; | ||
var index = this._index++; | ||
var count = this._blockCount; | ||
this._pending++; | ||
return function (cause, data) { | ||
if (cause) { | ||
var err = new Error(f('%s codec compression error', self._codec)); | ||
err.cause = cause; | ||
self.emit('error', err); | ||
return; | ||
} | ||
self._pending--; | ||
self._queue.push(new BlockData(index, data, cb, count)); | ||
if (self._needPush) { | ||
self._needPush = false; | ||
self._read(); | ||
} | ||
}; | ||
}; | ||
// Helpers. | ||
/** Summary information about a block. */ | ||
class BlockInfo { | ||
constructor (count, raw, compressed) { | ||
this.valueCount = count; | ||
this.rawDataLength = raw; | ||
this.compressedDataLength = compressed; | ||
} | ||
} | ||
// Helpers. | ||
/** | ||
@@ -563,7 +607,9 @@ * An indexed block. | ||
*/ | ||
function BlockData(index, buf, cb, count) { | ||
this.index = index; | ||
this.buf = buf; | ||
this.cb = cb; | ||
this.count = count | 0; | ||
class BlockData { | ||
constructor (index, buf, cb, count) { | ||
this.index = index; | ||
this.buf = buf; | ||
this.cb = cb; | ||
this.count = count | 0; | ||
} | ||
} | ||
@@ -573,4 +619,4 @@ | ||
function tryReadBlock(tap) { | ||
var pos = tap.pos; | ||
var block = BLOCK_TYPE._read(tap); | ||
let pos = tap.pos; | ||
let block = BLOCK_TYPE._read(tap); | ||
if (!tap.isValid()) { | ||
@@ -588,9 +634,9 @@ tap.pos = pos; | ||
return function (tap) { | ||
var pos = tap.pos; | ||
let pos = tap.pos; | ||
skipper(tap); | ||
return tap.buf.slice(pos, tap.pos); | ||
return tap.subarray(pos, tap.pos); | ||
}; | ||
})(writerType._skip); | ||
} else if (readerType) { | ||
var resolver = readerType.createResolver(writerType); | ||
let resolver = readerType.createResolver(writerType); | ||
return function (tap) { return resolver._read(tap); }; | ||
@@ -602,20 +648,13 @@ } else { | ||
/** Copy a buffer. This avoids creating a slice of the original buffer. */ | ||
function copyBuffer(buf, pos, len) { | ||
var copy = utils.newBuffer(len); | ||
buf.copy(copy, 0, pos, pos + len); | ||
return copy; | ||
} | ||
module.exports = { | ||
BLOCK_TYPE: BLOCK_TYPE, // For tests. | ||
HEADER_TYPE: HEADER_TYPE, // Idem. | ||
MAGIC_BYTES: MAGIC_BYTES, // Idem. | ||
BLOCK_TYPE, // For tests. | ||
HEADER_TYPE, // Idem. | ||
MAGIC_BYTES, // Idem. | ||
streams: { | ||
BlockDecoder: BlockDecoder, | ||
BlockEncoder: BlockEncoder, | ||
RawDecoder: RawDecoder, | ||
RawEncoder: RawEncoder | ||
BlockDecoder, | ||
BlockEncoder, | ||
RawDecoder, | ||
RawEncoder | ||
} | ||
}; |
@@ -1,3 +0,1 @@ | ||
/* jshint node: true */ | ||
'use strict'; | ||
@@ -12,3 +10,3 @@ | ||
var fs = require('fs'), | ||
let fs = require('fs'), | ||
path = require('path'); | ||
@@ -18,5 +16,5 @@ | ||
function createImportHook() { | ||
var imports = {}; | ||
return function (fpath, kind, cb) { | ||
fpath = path.resolve(fpath); | ||
let imports = {}; | ||
return function ({path: fpath, importerPath}, cb) { | ||
fpath = path.resolve(path.dirname(importerPath), fpath); | ||
if (imports[fpath]) { | ||
@@ -28,3 +26,6 @@ // Already imported, return nothing to avoid duplicating attributes. | ||
imports[fpath] = true; | ||
fs.readFile(fpath, {encoding: 'utf8'}, cb); | ||
fs.readFile(fpath, {encoding: 'utf8'}, (err, data) => { | ||
if (err) return cb(err); | ||
return cb(null, {contents: data, path: fpath}); | ||
}); | ||
}; | ||
@@ -42,5 +43,5 @@ } | ||
function createSyncImportHook() { | ||
var imports = {}; | ||
return function (fpath, kind, cb) { | ||
fpath = path.resolve(fpath); | ||
let imports = {}; | ||
return function ({path: fpath, importerPath}, cb) { | ||
fpath = path.resolve(path.dirname(importerPath), fpath); | ||
if (imports[fpath]) { | ||
@@ -50,3 +51,6 @@ cb(); | ||
imports[fpath] = true; | ||
cb(null, fs.readFileSync(fpath, {encoding: 'utf8'})); | ||
cb(null, { | ||
contents: fs.readFileSync(fpath, {encoding: 'utf8'}), | ||
path: fpath | ||
}); | ||
} | ||
@@ -56,9 +60,27 @@ }; | ||
/** | ||
* Check if the given input string is "path-like" or a path to an existing file, | ||
* and if so, read it. This requires it to contain a path separator | ||
* (`path.sep`). If not, this will return `null`. | ||
*/ | ||
function tryReadFileSync(str) { | ||
if ( | ||
typeof str == 'string' && | ||
str.indexOf(path.sep) !== -1 | ||
) { | ||
try { | ||
// Try interpreting `str` as path to a file. | ||
return fs.readFileSync(str, {encoding: 'utf8'}); | ||
} catch (err) { | ||
// If the file doesn't exist, return `null`. Rethrow all other errors. | ||
if (err.code !== 'ENOENT') throw err; | ||
} | ||
} | ||
return null; | ||
} | ||
module.exports = { | ||
createImportHook: createImportHook, | ||
createSyncImportHook: createSyncImportHook, | ||
// Proxy a few methods to better shim them for browserify. | ||
existsSync: fs.existsSync, | ||
readFileSync: fs.readFileSync | ||
createImportHook, | ||
createSyncImportHook, | ||
tryReadFileSync | ||
}; |
@@ -1,3 +0,1 @@ | ||
/* jshint node: true */ | ||
'use strict'; | ||
@@ -12,4 +10,3 @@ | ||
var containers = require('./containers'), | ||
services = require('./services'), | ||
let containers = require('./containers'), | ||
specs = require('./specs'), | ||
@@ -21,9 +18,3 @@ types = require('./types'), | ||
/** Parse a schema and return the corresponding type or service. */ | ||
function parse(any, opts) { | ||
var schemaOrProtocol = specs.read(any); | ||
return schemaOrProtocol.protocol ? | ||
services.Service.forProtocol(schemaOrProtocol, opts) : | ||
types.Type.forSchema(schemaOrProtocol, opts); | ||
} | ||
const DECODER = new TextDecoder(); | ||
@@ -34,15 +25,18 @@ /** Extract a container file's header synchronously. */ | ||
var decode = opts.decode === undefined ? true : !!opts.decode; | ||
var size = Math.max(opts.size || 4096, 4); | ||
var fd = fs.openSync(path, 'r'); | ||
var buf = utils.newBuffer(size); | ||
var pos = 0; | ||
var tap = new utils.Tap(buf); | ||
var header = null; | ||
let decode = opts.decode === undefined ? true : !!opts.decode; | ||
let size = Math.max(opts.size || 4096, 4); | ||
let buf = new Uint8Array(size); | ||
let tap = new utils.Tap(buf); | ||
let fd = fs.openSync(path, 'r'); | ||
while (pos < 4) { | ||
// Make sure we have enough to check the magic bytes. | ||
pos += fs.readSync(fd, buf, pos, size - pos); | ||
} | ||
if (containers.MAGIC_BYTES.equals(buf.slice(0, 4))) { | ||
try { | ||
let pos = fs.readSync(fd, buf, 0, size); | ||
if ( | ||
pos < 4 || | ||
!utils.bufEqual(containers.MAGIC_BYTES, buf.subarray(0, 4)) | ||
) { | ||
return null; | ||
} | ||
let header = null; | ||
do { | ||
@@ -52,11 +46,12 @@ header = containers.HEADER_TYPE._read(tap); | ||
if (decode !== false) { | ||
var meta = header.meta; | ||
meta['avro.schema'] = JSON.parse(meta['avro.schema'].toString()); | ||
let meta = header.meta; | ||
meta['avro.schema'] = JSON.parse(DECODER.decode(meta['avro.schema'])); | ||
if (meta['avro.codec'] !== undefined) { | ||
meta['avro.codec'] = meta['avro.codec'].toString(); | ||
meta['avro.codec'] = DECODER.decode(meta['avro.codec']); | ||
} | ||
} | ||
return header; | ||
} finally { | ||
fs.closeSync(fd); | ||
} | ||
fs.closeSync(fd); | ||
return header; | ||
@@ -67,7 +62,6 @@ function isValid() { | ||
} | ||
var len = 2 * tap.buf.length; | ||
var buf = utils.newBuffer(len); | ||
let len = 2 * tap.length; | ||
let buf = new Uint8Array(len); | ||
len = fs.readSync(fd, buf, 0, len); | ||
tap.buf = Buffer.concat([tap.buf, buf]); | ||
tap.pos = 0; | ||
tap.append(buf); | ||
return false; | ||
@@ -85,3 +79,3 @@ } | ||
function createFileEncoder(path, schema, opts) { | ||
var encoder = new containers.streams.BlockEncoder(schema, opts); | ||
let encoder = new containers.streams.BlockEncoder(schema, opts); | ||
encoder.pipe(fs.createWriteStream(path, {defaultEncoding: 'binary'})); | ||
@@ -93,14 +87,11 @@ return encoder; | ||
module.exports = { | ||
Service: services.Service, | ||
Type: types.Type, | ||
assembleProtocol: specs.assembleProtocol, | ||
createFileDecoder: createFileDecoder, | ||
createFileEncoder: createFileEncoder, | ||
discoverProtocol: services.discoverProtocol, | ||
extractFileHeader: extractFileHeader, | ||
parse: parse, | ||
createFileDecoder, | ||
createFileEncoder, | ||
extractFileHeader, | ||
readProtocol: specs.readProtocol, | ||
readSchema: specs.readSchema, | ||
streams: containers.streams, | ||
types: types.builtins | ||
types: types.builtins, | ||
}; |
1071
lib/specs.js
@@ -1,3 +0,1 @@ | ||
/* jshint node: true */ | ||
// TODO: Add minimal templating. | ||
@@ -9,15 +7,10 @@ // TODO: Add option to prefix nested type declarations with the outer types' | ||
/** IDL to protocol (services) and schema (types) parsing logic. */ | ||
/** IDL parsing logic. */ | ||
var files = require('./files'), | ||
utils = require('./utils'), | ||
path = require('path'), | ||
util = require('util'); | ||
let files = require('./files'), | ||
utils = require('./utils'); | ||
var f = util.format; | ||
// Default type references defined by Avro. | ||
var TYPE_REFS = { | ||
let TYPE_REFS = { | ||
date: {type: 'int', logicalType: 'date'}, | ||
@@ -41,7 +34,3 @@ decimal: {type: 'bytes', logicalType: 'decimal'}, | ||
// Types found in imports. We store them separately to be able to insert them | ||
// in the correct order in the final attributes. | ||
var importedTypes = []; | ||
var protocol, imports; | ||
opts.importHook(fpath, 'idl', function (err, str) { | ||
importFile(fpath, '', (err, protocol) => { | ||
if (err) { | ||
@@ -51,38 +40,89 @@ cb(err); | ||
} | ||
if (str === undefined) { | ||
// Skipped import (likely already imported). | ||
cb(null, {}); | ||
if (!protocol) { | ||
cb(new Error('empty root import')); | ||
return; | ||
} | ||
try { | ||
var reader = new Reader(str, opts); | ||
var obj = reader._readProtocol(str, opts); | ||
} catch (err) { | ||
err.path = fpath; // To help debug which file caused the error. | ||
cb(err); | ||
return; | ||
let schemas = protocol.types; | ||
if (schemas) { | ||
// Strip redundant namespaces from types before returning the protocol. | ||
// Note that we keep empty (`''`) nested namespaces when the outer one is | ||
// non-empty. This allows figuring out whether unqualified imported names | ||
// should be qualified by the protocol's namespace: they should if their | ||
// namespace is `undefined` and should not if it is empty. | ||
let namespace = protocolNamespace(protocol) || ''; | ||
schemas.forEach((schema) => { | ||
if (schema.namespace === namespace) { | ||
delete schema.namespace; | ||
} | ||
}); | ||
} | ||
protocol = obj.protocol; | ||
imports = obj.imports; | ||
fetchImports(); | ||
cb(null, protocol); | ||
}); | ||
function fetchImports() { | ||
var info = imports.shift(); | ||
if (!info) { | ||
// We are done with this file. We prepend all imported types to this | ||
// file's and we can return the final result. | ||
if (importedTypes.length) { | ||
protocol.types = protocol.types ? | ||
importedTypes.concat(protocol.types) : | ||
importedTypes; | ||
function importFile(fpath, importerPath, cb) { | ||
opts.importHook({path: fpath, importerPath, kind: 'idl'}, (err, payload) => { | ||
if (err) { | ||
cb(err); | ||
return; | ||
} | ||
cb(null, protocol); | ||
} else { | ||
var importPath = path.join(path.dirname(fpath), info.name); | ||
if (!payload) { | ||
// This signals an already imported file by the default import hooks. | ||
// Implementors who wish to disallow duplicate imports should provide a | ||
// custom hook which throws an error when a duplicate is detected. | ||
cb(); | ||
return; | ||
} | ||
const {contents: str, path: fpath} = payload; | ||
let obj; | ||
try { | ||
let reader = new Reader(str, opts); | ||
obj = reader._readProtocol(str, opts); | ||
} catch (err) { | ||
err.path = fpath; // To help debug which file caused the error. | ||
cb(err); | ||
return; | ||
} | ||
fetchImports(obj.protocol, obj.imports, fpath, cb); | ||
}); | ||
} | ||
function fetchImports(protocol, imports, fpath, cb) { | ||
let importedProtocols = []; | ||
next(); | ||
function next() { | ||
let info = imports.shift(); | ||
if (!info) { | ||
// We are done with this file. We prepend all imported types to this | ||
// file's and we can return the final result. | ||
importedProtocols.reverse(); | ||
try { | ||
importedProtocols.forEach((imported) => { | ||
mergeImport(protocol, imported); | ||
}); | ||
} catch (err) { | ||
cb(err); | ||
return; | ||
} | ||
cb(null, protocol); | ||
return; | ||
} | ||
if (info.kind === 'idl') { | ||
assembleProtocol(importPath, opts, mergeImportedSchema); | ||
importFile(info.name, fpath, (err, imported) => { | ||
if (err) { | ||
cb(err); | ||
return; | ||
} | ||
if (imported) { | ||
importedProtocols.push(imported); | ||
} | ||
next(); | ||
}); | ||
} else { | ||
// We are importing a protocol or schema file. | ||
opts.importHook(importPath, info.kind, function (err, str) { | ||
opts.importHook({ | ||
path: info.name, | ||
importerPath: fpath, | ||
kind: info.kind | ||
}, (err, payload) => { | ||
if (err) { | ||
@@ -94,23 +134,23 @@ cb(err); | ||
case 'protocol': | ||
case 'schema': | ||
if (str === undefined) { | ||
// Flag used to signal an already imported file by the default | ||
// import hooks. Implementors who wish to disallow duplicate | ||
// imports should provide a custom hook which throws an error | ||
// when a duplicate import is detected. | ||
mergeImportedSchema(null, {}); | ||
case 'schema': { | ||
if (!payload) { | ||
// Skip duplicate import (see related comment above). | ||
next(); | ||
return; | ||
} | ||
let obj; | ||
try { | ||
var obj = JSON.parse(str); | ||
obj = JSON.parse(payload.contents); | ||
} catch (err) { | ||
err.path = importPath; | ||
err.path = payload.path; | ||
cb(err); | ||
return; | ||
} | ||
var schema = info.kind === 'schema' ? {types: [obj]} : obj; | ||
mergeImportedSchema(null, schema); | ||
break; | ||
let imported = info.kind === 'schema' ? {types: [obj]} : obj; | ||
importedProtocols.push(imported); | ||
next(); | ||
return; | ||
} | ||
default: | ||
cb(new Error(f('invalid import kind: %s', info.kind))); | ||
cb(new Error(`invalid import kind: ${info.kind}`)); | ||
} | ||
@@ -122,40 +162,28 @@ }); | ||
function mergeImportedSchema(err, importedSchema) { | ||
if (err) { | ||
cb(err); | ||
return; | ||
} | ||
// Merge first the types (where we don't need to check for duplicates | ||
function mergeImport(protocol, imported) { | ||
// Merge first the types (where we don't need to check for duplicates | ||
// since instantiating the service will take care of it), then the messages | ||
// (where we need to, as duplicates will overwrite each other). | ||
(importedSchema.types || []).forEach(function (typeSchema) { | ||
let schemas = imported.types || []; | ||
schemas.reverse(); | ||
schemas.forEach((schema) => { | ||
if (!protocol.types) { | ||
protocol.types = []; | ||
} | ||
// Ensure the imported protocol's namespace is inherited correctly (it | ||
// might be different from the current one). | ||
if (typeSchema.namespace === undefined) { | ||
var namespace = importedSchema.namespace; | ||
if (!namespace) { | ||
var match = /^(.*)\.[^.]+$/.exec(importedSchema.protocol); | ||
if (match) { | ||
namespace = match[1]; | ||
} | ||
} | ||
typeSchema.namespace = namespace || ''; | ||
if (schema.namespace === undefined) { | ||
schema.namespace = protocolNamespace(imported) || ''; | ||
} | ||
importedTypes.push(typeSchema); | ||
protocol.types.unshift(schema); | ||
}); | ||
try { | ||
Object.keys(importedSchema.messages || {}).forEach(function (name) { | ||
if (!protocol.messages) { | ||
protocol.messages = {}; | ||
} | ||
if (protocol.messages[name]) { | ||
throw new Error(f('duplicate message: %s', name)); | ||
} | ||
protocol.messages[name] = importedSchema.messages[name]; | ||
}); | ||
} catch (err) { | ||
cb(err); | ||
return; | ||
} | ||
fetchImports(); // Continue importing any remaining imports. | ||
Object.keys(imported.messages || {}).forEach((name) => { | ||
if (!protocol.messages) { | ||
protocol.messages = {}; | ||
} | ||
if (protocol.messages[name]) { | ||
throw new Error(`duplicate message: ${name}`); | ||
} | ||
protocol.messages[name] = imported.messages[name]; | ||
}); | ||
} | ||
@@ -176,3 +204,3 @@ } | ||
* | ||
* + If `str` contains `path.sep` (on windows `\`, otherwise `/`) and is a path | ||
* + If `str` contains `/` and is a path | ||
* to an existing file, it will first be read as JSON, then as an IDL | ||
@@ -190,18 +218,16 @@ * specification if JSON parsing failed. If either succeeds, the result is | ||
function read(str) { | ||
var schema; | ||
if (typeof str == 'string' && ~str.indexOf(path.sep) && files.existsSync(str)) { | ||
// Try interpreting `str` as path to a file contain a JSON schema or an IDL | ||
// protocol. Note that we add the second check to skip primitive references | ||
// (e.g. `"int"`, the most common use-case for `avro.parse`). | ||
var contents = files.readFileSync(str, {encoding: 'utf8'}); | ||
let schema; | ||
let contents = files.tryReadFileSync(str); | ||
if (contents === null) { | ||
schema = str; | ||
} else { | ||
try { | ||
return JSON.parse(contents); | ||
} catch (err) { | ||
var opts = {importHook: files.createSyncImportHook()}; | ||
assembleProtocol(str, opts, function (err, protocolSchema) { | ||
let opts = {importHook: files.createSyncImportHook()}; | ||
assembleProtocol(str, opts, (err, protocolSchema) => { | ||
schema = err ? contents : protocolSchema; | ||
}); | ||
} | ||
} else { | ||
schema = str; | ||
} | ||
@@ -229,306 +255,320 @@ if (typeof schema != 'string' || schema === 'null') { | ||
function Reader(str, opts) { | ||
opts = opts || {}; | ||
class Reader { | ||
constructor (str, opts) { | ||
opts = opts || {}; | ||
this._tk = new Tokenizer(str); | ||
this._ackVoidMessages = !!opts.ackVoidMessages; | ||
this._implicitTags = !opts.delimitedCollections; | ||
this._typeRefs = opts.typeRefs || TYPE_REFS; | ||
} | ||
this._tk = new Tokenizer(str); | ||
this._ackVoidMessages = !!opts.ackVoidMessages; | ||
this._implicitTags = !opts.delimitedCollections; | ||
this._typeRefs = opts.typeRefs || TYPE_REFS; | ||
} | ||
Reader.readProtocol = function (str, opts) { | ||
var reader = new Reader(str, opts); | ||
var protocol = reader._readProtocol(); | ||
if (protocol.imports.length) { | ||
// Imports can only be resolved when the IDL file is provided via its | ||
// path, we fail rather than silently ignore imports. | ||
throw new Error('unresolvable import'); | ||
static readProtocol (str, opts) { | ||
let reader = new Reader(str, opts); | ||
let protocol = reader._readProtocol(); | ||
if (protocol.imports.length) { | ||
// Imports can only be resolved when the IDL file is provided via its | ||
// path, we fail rather than silently ignore imports. | ||
throw new Error('unresolvable import'); | ||
} | ||
return protocol.protocol; | ||
} | ||
return protocol.protocol; | ||
}; | ||
Reader.readSchema = function (str, opts) { | ||
var reader = new Reader(str, opts); | ||
var javadoc = reader._readJavadoc(); | ||
var schema = reader._readType(javadoc === undefined ? {} : {doc: javadoc}); | ||
reader._tk.next({id: '(eof)'}); // Check that we have read everything. | ||
return schema; | ||
}; | ||
static readSchema (str, opts) { | ||
let reader = new Reader(str, opts); | ||
let doc = reader._readJavadoc(); | ||
let schema = reader._readType(doc === undefined ? {} : {doc}, true); | ||
reader._tk.next({id: '(eof)'}); // Check that we have read everything. | ||
return schema; | ||
} | ||
Reader.prototype._readProtocol = function () { | ||
var tk = this._tk; | ||
var imports = []; | ||
var types = []; | ||
var messages = {}; | ||
var pos; | ||
_readProtocol () { | ||
let tk = this._tk; | ||
let imports = []; | ||
let types = []; | ||
let messages = {}; | ||
// Outer declarations (outside of the protocol block). | ||
this._readImports(imports); | ||
var protocolSchema = {}; | ||
var protocolJavadoc = this._readJavadoc(); | ||
if (protocolJavadoc !== undefined) { | ||
protocolSchema.doc = protocolJavadoc; | ||
} | ||
this._readAnnotations(protocolSchema); | ||
tk.next({val: 'protocol'}); | ||
if (!tk.next({val: '{', silent: true})) { | ||
// Named protocol. | ||
protocolSchema.protocol = tk.next({id: 'name'}).val; | ||
tk.next({val: '{'}); | ||
} | ||
// Outer declarations (outside of the protocol block). | ||
this._readImports(imports); | ||
let protocolSchema = {}; | ||
let protocolJavadoc = this._readJavadoc(); | ||
if (protocolJavadoc !== undefined) { | ||
protocolSchema.doc = protocolJavadoc; | ||
} | ||
this._readAnnotations(protocolSchema); | ||
tk.next({val: 'protocol'}); | ||
if (!tk.next({val: '{', silent: true})) { | ||
// Named protocol. | ||
protocolSchema.protocol = tk.next({id: 'name'}).val; | ||
tk.next({val: '{'}); | ||
} | ||
// Inner declarations. | ||
while (!tk.next({val: '}', silent: true})) { | ||
if (!this._readImports(imports)) { | ||
var javadoc = this._readJavadoc(); | ||
var typeSchema = this._readType(); | ||
var numImports = this._readImports(imports, true); | ||
var message = undefined; | ||
// We mark our position and try to parse a message from here. | ||
pos = tk.pos; | ||
if (!numImports && (message = this._readMessage(typeSchema))) { | ||
// Note that if any imports were found, we cannot be parsing a message. | ||
if (javadoc !== undefined && message.schema.doc === undefined) { | ||
message.schema.doc = javadoc; | ||
} | ||
var oneWay = false; | ||
if ( | ||
message.schema.response === 'void' || | ||
message.schema.response.type === 'void' | ||
) { | ||
oneWay = !this._ackVoidMessages && !message.schema.errors; | ||
if (message.schema.response === 'void') { | ||
message.schema.response = 'null'; | ||
} else { | ||
message.schema.response.type = 'null'; | ||
// Inner declarations. | ||
while (!tk.next({val: '}', silent: true})) { | ||
if (!this._readImports(imports)) { | ||
let javadoc = this._readJavadoc(); | ||
let typeSchema = this._readType({}, true); | ||
let numImports = this._readImports(imports, true); | ||
let message = undefined; | ||
// We mark our position and try to parse a message from here. | ||
let pos = tk.pos; | ||
if (!numImports && (message = this._readMessage(typeSchema))) { | ||
// Note that if any imports were found, we cannot be parsing a | ||
// message. | ||
if (javadoc !== undefined && message.schema.doc === undefined) { | ||
message.schema.doc = javadoc; | ||
} | ||
} | ||
if (oneWay) { | ||
message.schema['one-way'] = true; | ||
} | ||
if (messages[message.name]) { | ||
// We have to do this check here otherwise the duplicate will be | ||
// overwritten (and service instantiation won't be able to catch it). | ||
throw new Error(f('duplicate message: %s', message.name)); | ||
} | ||
messages[message.name] = message.schema; | ||
} else { | ||
// This was a standalone type definition. | ||
if (javadoc) { | ||
if (typeof typeSchema == 'string') { | ||
typeSchema = {doc: javadoc, type: typeSchema}; | ||
} else if (typeSchema.doc === undefined) { | ||
typeSchema.doc = javadoc; | ||
let oneWay = false; | ||
if ( | ||
message.schema.response === 'void' || | ||
message.schema.response.type === 'void' | ||
) { | ||
oneWay = !this._ackVoidMessages && !message.schema.errors; | ||
if (message.schema.response === 'void') { | ||
message.schema.response = 'null'; | ||
} else { | ||
message.schema.response.type = 'null'; | ||
} | ||
} | ||
if (oneWay) { | ||
message.schema['one-way'] = true; | ||
} | ||
if (messages[message.name]) { | ||
// We have to do this check here otherwise the duplicate will be | ||
// overwritten (and service instantiation won't be able to catch | ||
// it). | ||
throw new Error(`duplicate message: ${message.name}`); | ||
} | ||
messages[message.name] = message.schema; | ||
} else { | ||
// This was a standalone type definition. | ||
if (javadoc) { | ||
if (typeof typeSchema == 'string') { | ||
typeSchema = {doc: javadoc, type: typeSchema}; | ||
} else if (typeSchema.doc === undefined) { | ||
typeSchema.doc = javadoc; | ||
} | ||
} | ||
types.push(typeSchema); | ||
// We backtrack until just before the type's type name and swallow an | ||
// eventual semi-colon (to make type declarations more consistent). | ||
tk.pos = pos; | ||
tk.next({val: ';', silent: true}); | ||
} | ||
types.push(typeSchema); | ||
// We backtrack until just before the type's type name and swallow an | ||
// eventual semi-colon (to make type declarations more consistent). | ||
tk.pos = pos; | ||
tk.next({val: ';', silent: true}); | ||
javadoc = undefined; | ||
} | ||
javadoc = undefined; | ||
} | ||
tk.next({id: '(eof)'}); | ||
if (types.length) { | ||
protocolSchema.types = types; | ||
} | ||
if (Object.keys(messages).length) { | ||
protocolSchema.messages = messages; | ||
} | ||
return {protocol: protocolSchema, imports}; | ||
} | ||
tk.next({id: '(eof)'}); | ||
if (types.length) { | ||
protocolSchema.types = types; | ||
} | ||
if (Object.keys(messages).length) { | ||
protocolSchema.messages = messages; | ||
} | ||
return {protocol: protocolSchema, imports: imports}; | ||
}; | ||
Reader.prototype._readAnnotations = function (schema) { | ||
var tk = this._tk; | ||
while (tk.next({val: '@', silent: true})) { | ||
// Annotations are allowed to have names which aren't valid Avro names, | ||
// we must advance until we hit the first left parenthesis. | ||
var parts = []; | ||
while (!tk.next({val: '(', silent: true})) { | ||
parts.push(tk.next().val); | ||
_readAnnotations (schema) { | ||
let tk = this._tk; | ||
while (tk.next({val: '@', silent: true})) { | ||
// Annotations are allowed to have names which aren't valid Avro names, | ||
// we must advance until we hit the first left parenthesis. | ||
let parts = []; | ||
while (!tk.next({val: '(', silent: true})) { | ||
parts.push(tk.next().val); | ||
} | ||
schema[parts.join('')] = tk.next({id: 'json'}).val; | ||
tk.next({val: ')'}); | ||
} | ||
schema[parts.join('')] = tk.next({id: 'json'}).val; | ||
tk.next({val: ')'}); | ||
} | ||
}; | ||
Reader.prototype._readMessage = function (responseSchema) { | ||
var tk = this._tk; | ||
var schema = {request: [], response: responseSchema}; | ||
this._readAnnotations(schema); | ||
var name = tk.next().val; | ||
if (tk.next().val !== '(') { | ||
// This isn't a message. | ||
return; | ||
} | ||
if (!tk.next({val: ')', silent: true})) { | ||
do { | ||
schema.request.push(this._readField()); | ||
} while (!tk.next({val: ')', silent: true}) && tk.next({val: ','})); | ||
} | ||
var token = tk.next(); | ||
switch (token.val) { | ||
case 'throws': | ||
// It doesn't seem like the IDL is explicit about which syntax to used | ||
// for multiple errors. We will assume a comma-separated list. | ||
schema.errors = []; | ||
_readMessage (responseSchema) { | ||
let tk = this._tk; | ||
let schema = {request: [], response: responseSchema}; | ||
this._readAnnotations(schema); | ||
let name = tk.next().val; | ||
if (tk.next().val !== '(') { | ||
// This isn't a message. | ||
return; | ||
} | ||
if (!tk.next({val: ')', silent: true})) { | ||
do { | ||
schema.errors.push(this._readType()); | ||
} while (!tk.next({val: ';', silent: true}) && tk.next({val: ','})); | ||
break; | ||
case 'oneway': | ||
schema['one-way'] = true; | ||
tk.next({val: ';'}); | ||
break; | ||
case ';': | ||
break; | ||
default: | ||
throw tk.error('invalid message suffix', token); | ||
schema.request.push(this._readField()); | ||
} while (!tk.next({val: ')', silent: true}) && tk.next({val: ','})); | ||
} | ||
let token = tk.next(); | ||
switch (token.val) { | ||
case 'throws': | ||
// It doesn't seem like the IDL is explicit about which syntax to used | ||
// for multiple errors. We will assume a comma-separated list. | ||
schema.errors = []; | ||
do { | ||
schema.errors.push(this._readType()); | ||
} while (!tk.next({val: ';', silent: true}) && tk.next({val: ','})); | ||
break; | ||
case 'oneway': | ||
schema['one-way'] = true; | ||
tk.next({val: ';'}); | ||
break; | ||
case ';': | ||
break; | ||
default: | ||
throw tk.error('invalid message suffix', token); | ||
} | ||
return {name, schema}; | ||
} | ||
return {name: name, schema: schema}; | ||
}; | ||
Reader.prototype._readJavadoc = function () { | ||
var token = this._tk.next({id: 'javadoc', emitJavadoc: true, silent: true}); | ||
if (token) { | ||
return token.val; | ||
_readJavadoc () { | ||
let token = this._tk.next({id: 'javadoc', emitJavadoc: true, silent: true}); | ||
if (token) { | ||
return token.val; | ||
} | ||
} | ||
}; | ||
Reader.prototype._readField = function () { | ||
var tk = this._tk; | ||
var javadoc = this._readJavadoc(); | ||
var schema = {type: this._readType()}; | ||
if (javadoc !== undefined && schema.doc === undefined) { | ||
schema.doc = javadoc; | ||
_readField () { | ||
let tk = this._tk; | ||
let javadoc = this._readJavadoc(); | ||
let schema = {type: this._readType()}; | ||
if (javadoc !== undefined && schema.doc === undefined) { | ||
schema.doc = javadoc; | ||
} | ||
const isOptional = tk.next({id: 'operator', val: '?', silent: true}); | ||
this._readAnnotations(schema); | ||
schema.name = tk.next({id: 'name'}).val; | ||
if (tk.next({val: '=', silent: true})) { | ||
schema['default'] = tk.next({id: 'json'}).val; | ||
} | ||
if (isOptional) { | ||
schema.type = 'default' in schema && schema.default !== null ? [schema.type, 'null'] : ['null', schema.type]; | ||
} | ||
return schema; | ||
} | ||
this._readAnnotations(schema); | ||
schema.name = tk.next({id: 'name'}).val; | ||
if (tk.next({val: '=', silent: true})) { | ||
schema['default'] = tk.next({id: 'json'}).val; | ||
} | ||
return schema; | ||
}; | ||
Reader.prototype._readType = function (schema) { | ||
schema = schema || {}; | ||
this._readAnnotations(schema); | ||
schema.type = this._tk.next({id: 'name'}).val; | ||
switch (schema.type) { | ||
case 'record': | ||
case 'error': | ||
return this._readRecord(schema); | ||
case 'fixed': | ||
return this._readFixed(schema); | ||
case 'enum': | ||
return this._readEnum(schema); | ||
case 'map': | ||
return this._readMap(schema); | ||
case 'array': | ||
return this._readArray(schema); | ||
case 'union': | ||
if (Object.keys(schema).length > 1) { | ||
throw new Error('union annotations are not supported'); | ||
_readType (schema, top) { | ||
schema = schema || {}; | ||
this._readAnnotations(schema); | ||
schema.type = this._tk.next({id: 'name'}).val; | ||
switch (schema.type) { | ||
case 'record': | ||
case 'error': | ||
return this._readRecord(schema); | ||
case 'fixed': | ||
return this._readFixed(schema); | ||
case 'enum': | ||
return this._readEnum(schema, top); | ||
case 'map': | ||
return this._readMap(schema); | ||
case 'array': | ||
return this._readArray(schema); | ||
case 'union': | ||
if (Object.keys(schema).length > 1) { | ||
throw new Error('union annotations are not supported'); | ||
} | ||
return this._readUnion(); | ||
default: { | ||
// Reference. | ||
let ref = this._typeRefs[schema.type]; | ||
if (ref) { | ||
delete schema.type; // Always overwrite the type. | ||
utils.copyOwnProperties(ref, schema); | ||
} | ||
return Object.keys(schema).length > 1 ? schema : schema.type; | ||
} | ||
return this._readUnion(); | ||
default: | ||
// Reference. | ||
var ref = this._typeRefs[schema.type]; | ||
if (ref) { | ||
delete schema.type; // Always overwrite the type. | ||
utils.copyOwnProperties(ref, schema); | ||
} | ||
return Object.keys(schema).length > 1 ? schema : schema.type; | ||
} | ||
} | ||
}; | ||
Reader.prototype._readFixed = function (schema) { | ||
var tk = this._tk; | ||
if (!tk.next({val: '(', silent: true})) { | ||
schema.name = tk.next({id: 'name'}).val; | ||
tk.next({val: '('}); | ||
_readFixed (schema) { | ||
let tk = this._tk; | ||
if (!tk.next({val: '(', silent: true})) { | ||
schema.name = tk.next({id: 'name'}).val; | ||
tk.next({val: '('}); | ||
} | ||
schema.size = parseInt(tk.next({id: 'number'}).val); | ||
tk.next({val: ')'}); | ||
return schema; | ||
} | ||
schema.size = parseInt(tk.next({id: 'number'}).val); | ||
tk.next({val: ')'}); | ||
return schema; | ||
}; | ||
Reader.prototype._readMap = function (schema) { | ||
var tk = this._tk; | ||
// Brackets are unwieldy when declaring inline types. We allow for them to be | ||
// omitted (but we keep the consistency that if the entry bracket is present, | ||
// the exit one must be as well). Note that this is non-standard. | ||
var silent = this._implicitTags; | ||
var implicitTags = tk.next({val: '<', silent: silent}) === undefined; | ||
schema.values = this._readType(); | ||
tk.next({val: '>', silent: implicitTags}); | ||
return schema; | ||
}; | ||
_readMap (schema) { | ||
let tk = this._tk; | ||
// Brackets are unwieldy when declaring inline types. We allow for them to | ||
// be omitted (but we keep the consistency that if the entry bracket is | ||
// present, the exit one must be as well). Note that this is non-standard. | ||
let silent = this._implicitTags; | ||
let implicitTags = tk.next({val: '<', silent}) === undefined; | ||
schema.values = this._readType(); | ||
tk.next({val: '>', silent: implicitTags}); | ||
return schema; | ||
} | ||
Reader.prototype._readArray = function (schema) { | ||
var tk = this._tk; | ||
var silent = this._implicitTags; | ||
var implicitTags = tk.next({val: '<', silent: silent}) === undefined; | ||
schema.items = this._readType(); | ||
tk.next({val: '>', silent: implicitTags}); | ||
return schema; | ||
}; | ||
_readArray (schema) { | ||
let tk = this._tk; | ||
let silent = this._implicitTags; | ||
let implicitTags = tk.next({val: '<', silent}) === undefined; | ||
schema.items = this._readType(); | ||
tk.next({val: '>', silent: implicitTags}); | ||
return schema; | ||
} | ||
Reader.prototype._readEnum = function (schema) { | ||
var tk = this._tk; | ||
if (!tk.next({val: '{', silent: true})) { | ||
schema.name = tk.next({id: 'name'}).val; | ||
tk.next({val: '{'}); | ||
_readEnum (schema, top) { | ||
let tk = this._tk; | ||
if (!tk.next({val: '{', silent: true})) { | ||
schema.name = tk.next({id: 'name'}).val; | ||
tk.next({val: '{'}); | ||
} | ||
schema.symbols = []; | ||
do { | ||
schema.symbols.push(tk.next().val); | ||
} while (!tk.next({val: '}', silent: true}) && tk.next({val: ','})); | ||
// To avoid confusing syntax, reader enums (i.e. enums with a default value) | ||
// can only be defined top-level. | ||
if (top && tk.next({val: '=', silent: true})) { | ||
schema.default = tk.next().val; | ||
tk.next({val: ';'}); | ||
} | ||
return schema; | ||
} | ||
schema.symbols = []; | ||
do { | ||
schema.symbols.push(tk.next().val); | ||
} while (!tk.next({val: '}', silent: true}) && tk.next({val: ','})); | ||
return schema; | ||
}; | ||
Reader.prototype._readUnion = function () { | ||
var tk = this._tk; | ||
var arr = []; | ||
tk.next({val: '{'}); | ||
do { | ||
arr.push(this._readType()); | ||
} while (!tk.next({val: '}', silent: true}) && tk.next({val: ','})); | ||
return arr; | ||
}; | ||
Reader.prototype._readRecord = function (schema) { | ||
var tk = this._tk; | ||
if (!tk.next({val: '{', silent: true})) { | ||
schema.name = tk.next({id: 'name'}).val; | ||
_readUnion () { | ||
let tk = this._tk; | ||
let arr = []; | ||
tk.next({val: '{'}); | ||
do { | ||
arr.push(this._readType()); | ||
} while (!tk.next({val: '}', silent: true}) && tk.next({val: ','})); | ||
return arr; | ||
} | ||
schema.fields = []; | ||
while (!tk.next({val: '}', silent: true})) { | ||
schema.fields.push(this._readField()); | ||
tk.next({val: ';'}); | ||
_readRecord (schema) { | ||
let tk = this._tk; | ||
if (!tk.next({val: '{', silent: true})) { | ||
schema.name = tk.next({id: 'name'}).val; | ||
tk.next({val: '{'}); | ||
} | ||
schema.fields = []; | ||
while (!tk.next({val: '}', silent: true})) { | ||
schema.fields.push(this._readField()); | ||
tk.next({val: ';'}); | ||
} | ||
return schema; | ||
} | ||
return schema; | ||
}; | ||
Reader.prototype._readImports = function (imports, maybeMessage) { | ||
var tk = this._tk; | ||
var numImports = 0; | ||
var pos = tk.pos; | ||
while (tk.next({val: 'import', silent: true})) { | ||
if (!numImports && maybeMessage && tk.next({val: '(', silent: true})) { | ||
// This will happen if a message is named import. | ||
tk.pos = pos; | ||
return; | ||
_readImports (imports, maybeMessage) { | ||
let tk = this._tk; | ||
let numImports = 0; | ||
let pos = tk.pos; | ||
while (tk.next({val: 'import', silent: true})) { | ||
if (!numImports && maybeMessage && tk.next({val: '(', silent: true})) { | ||
// This will happen if a message is named import. | ||
tk.pos = pos; | ||
return; | ||
} | ||
let kind = tk.next({id: 'name'}).val; | ||
let fname = JSON.parse(tk.next({id: 'string'}).val); | ||
tk.next({val: ';'}); | ||
imports.push({kind, name: fname}); | ||
numImports++; | ||
} | ||
var kind = tk.next({id: 'name'}).val; | ||
var fname = JSON.parse(tk.next({id: 'string'}).val); | ||
tk.next({val: ';'}); | ||
imports.push({kind: kind, name: fname}); | ||
numImports++; | ||
return numImports; | ||
} | ||
return numImports; | ||
}; | ||
} | ||
@@ -550,165 +590,169 @@ // Helpers. | ||
*/ | ||
function Tokenizer(str) { | ||
this._str = str; | ||
this.pos = 0; | ||
} | ||
class Tokenizer { | ||
constructor (str) { | ||
this._str = str; | ||
this.pos = 0; | ||
} | ||
Tokenizer.prototype.next = function (opts) { | ||
var token = {pos: this.pos, id: undefined, val: undefined}; | ||
var javadoc = this._skip(opts && opts.emitJavadoc); | ||
if (javadoc) { | ||
token.id = 'javadoc'; | ||
token.val = javadoc; | ||
} else { | ||
var pos = this.pos; | ||
var str = this._str; | ||
var c = str.charAt(pos); | ||
if (!c) { | ||
token.id = '(eof)'; | ||
next (opts) { | ||
let token = {pos: this.pos, id: undefined, val: undefined}; | ||
let javadoc = this._skip(opts && opts.emitJavadoc); | ||
if (typeof javadoc == 'string') { | ||
token.id = 'javadoc'; | ||
token.val = javadoc; | ||
} else { | ||
if (opts && opts.id === 'json') { | ||
token.id = 'json'; | ||
this.pos = this._endOfJson(); | ||
} else if (c === '"') { | ||
token.id = 'string'; | ||
this.pos = this._endOfString(); | ||
} else if (/[0-9]/.test(c)) { | ||
token.id = 'number'; | ||
this.pos = this._endOf(/[0-9]/); | ||
} else if (/[`A-Za-z_.]/.test(c)) { | ||
token.id = 'name'; | ||
this.pos = this._endOf(/[`A-Za-z0-9_.]/); | ||
let pos = this.pos; | ||
let str = this._str; | ||
let c = str.charAt(pos); | ||
if (!c) { | ||
token.id = '(eof)'; | ||
} else { | ||
token.id = 'operator'; | ||
this.pos = pos + 1; | ||
} | ||
token.val = str.slice(pos, this.pos); | ||
if (token.id === 'json') { | ||
// Let's be nice and give a more helpful error message when this occurs | ||
// (JSON parsing errors wouldn't let us find the location otherwise). | ||
try { | ||
token.val = JSON.parse(token.val); | ||
} catch (err) { | ||
throw this.error('invalid JSON', token); | ||
if (opts && opts.id === 'json') { | ||
token.id = 'json'; | ||
this.pos = this._endOfJson(); | ||
} else if (c === '"') { | ||
token.id = 'string'; | ||
this.pos = this._endOfString(); | ||
} else if (/[0-9]/.test(c)) { | ||
token.id = 'number'; | ||
this.pos = this._endOf(/[0-9]/); | ||
} else if (/[`A-Za-z_.]/.test(c)) { | ||
token.id = 'name'; | ||
this.pos = this._endOf(/[`A-Za-z0-9_.]/); | ||
} else { | ||
token.id = 'operator'; | ||
this.pos = pos + 1; | ||
} | ||
} else if (token.id === 'name') { | ||
// Unescape names (our parser doesn't need them). | ||
token.val = token.val.replace(/`/g, ''); | ||
token.val = str.slice(pos, this.pos); | ||
if (token.id === 'json') { | ||
// Let's be nice and give a more helpful error message when this | ||
// occurs (JSON parsing errors wouldn't let us find the location | ||
// otherwise). | ||
try { | ||
token.val = JSON.parse(token.val); | ||
} catch (err) { | ||
throw this.error('invalid JSON', token); | ||
} | ||
} else if (token.id === 'name') { | ||
// Unescape names (our parser doesn't need them). | ||
token.val = token.val.replace(/`/g, ''); | ||
} | ||
} | ||
} | ||
} | ||
var err; | ||
if (opts && opts.id && opts.id !== token.id) { | ||
err = this.error(f('expected ID %s', opts.id), token); | ||
} else if (opts && opts.val && opts.val !== token.val) { | ||
err = this.error(f('expected value %s', opts.val), token); | ||
let err; | ||
if (opts && opts.id && opts.id !== token.id) { | ||
err = this.error(`expected ID ${opts.id}`, token); | ||
} else if (opts && opts.val && opts.val !== token.val) { | ||
err = this.error(`expected value ${opts.val}`, token); | ||
} | ||
if (!err) { | ||
return token; | ||
} else if (opts && opts.silent) { | ||
this.pos = token.pos; // Backtrack to start of token. | ||
return undefined; | ||
} else { | ||
throw err; | ||
} | ||
} | ||
if (!err) { | ||
return token; | ||
} else if (opts && opts.silent) { | ||
this.pos = token.pos; // Backtrack to start of token. | ||
return undefined; | ||
} else { | ||
throw err; | ||
} | ||
}; | ||
Tokenizer.prototype.error = function (reason, context) { | ||
// Context must be either a token or a position. | ||
var isToken = typeof context != 'number'; | ||
var pos = isToken ? context.pos : context; | ||
var str = this._str; | ||
var lineNum = 1; | ||
var lineStart = 0; | ||
var i; | ||
for (i = 0; i < pos; i++) { | ||
if (str.charAt(i) === '\n') { | ||
lineNum++; | ||
lineStart = i; | ||
error (reason, context) { | ||
// Context must be either a token or a position. | ||
let isToken = typeof context != 'number'; | ||
let pos = isToken ? context.pos : context; | ||
let str = this._str; | ||
let lineNum = 1; | ||
let lineStart = 0; | ||
for (let i = 0; i < pos; i++) { | ||
if (str.charAt(i) === '\n') { | ||
lineNum++; | ||
lineStart = i; | ||
} | ||
} | ||
let msg = isToken ? | ||
`invalid token ${utils.printJSON(context)}: ${reason}` : | ||
reason; | ||
let err = new Error(msg); | ||
err.token = isToken ? context : undefined; | ||
err.lineNum = lineNum; | ||
err.colNum = pos - lineStart; | ||
return err; | ||
} | ||
var msg = isToken ? f('invalid token %j: %s', context, reason) : reason; | ||
var err = new Error(msg); | ||
err.token = isToken ? context : undefined; | ||
err.lineNum = lineNum; | ||
err.colNum = pos - lineStart; | ||
return err; | ||
}; | ||
/** Skip whitespace and comments. */ | ||
Tokenizer.prototype._skip = function (emitJavadoc) { | ||
var str = this._str; | ||
var isJavadoc = false; | ||
var pos, c; | ||
/** Skip whitespace and comments. */ | ||
_skip (emitJavadoc) { | ||
let str = this._str; | ||
let isJavadoc = false; | ||
let c; | ||
while ((c = str.charAt(this.pos)) && /\s/.test(c)) { | ||
this.pos++; | ||
} | ||
pos = this.pos; | ||
if (c === '/') { | ||
switch (str.charAt(this.pos + 1)) { | ||
case '/': | ||
this.pos += 2; | ||
while ((c = str.charAt(this.pos)) && c !== '\n') { | ||
this.pos++; | ||
} | ||
return this._skip(emitJavadoc); | ||
case '*': | ||
this.pos += 2; | ||
if (str.charAt(this.pos) === '*') { | ||
isJavadoc = true; | ||
} | ||
while ((c = str.charAt(this.pos++))) { | ||
if (c === '*' && str.charAt(this.pos) === '/') { | ||
this.pos++; | ||
if (isJavadoc && emitJavadoc) { | ||
return extractJavadoc(str.slice(pos + 3, this.pos - 2)); | ||
while ((c = str.charAt(this.pos)) && /\s/.test(c)) { | ||
this.pos++; | ||
} | ||
let pos = this.pos; | ||
if (c === '/') { | ||
switch (str.charAt(this.pos + 1)) { | ||
case '/': | ||
this.pos += 2; | ||
while ((c = str.charAt(this.pos)) && c !== '\n') { | ||
this.pos++; | ||
} | ||
return this._skip(emitJavadoc); | ||
} | ||
case '*': | ||
this.pos += 2; | ||
if (str.charAt(this.pos) === '*') { | ||
isJavadoc = true; | ||
} | ||
while ((c = str.charAt(this.pos++))) { | ||
if (c === '*' && str.charAt(this.pos) === '/') { | ||
this.pos++; | ||
if (isJavadoc && emitJavadoc) { | ||
return extractJavadoc(str.slice(pos + 3, this.pos - 2)); | ||
} | ||
return this._skip(emitJavadoc); | ||
} | ||
} | ||
throw this.error('unterminated comment', pos); | ||
} | ||
throw this.error('unterminated comment', pos); | ||
} | ||
} | ||
}; | ||
/** Generic end of method. */ | ||
Tokenizer.prototype._endOf = function (pat) { | ||
var pos = this.pos; | ||
var str = this._str; | ||
while (pat.test(str.charAt(pos))) { | ||
pos++; | ||
/** Generic end of method. */ | ||
_endOf (pat) { | ||
let pos = this.pos; | ||
let str = this._str; | ||
while (pat.test(str.charAt(pos))) { | ||
pos++; | ||
} | ||
return pos; | ||
} | ||
return pos; | ||
}; | ||
/** Find end of a string. */ | ||
Tokenizer.prototype._endOfString = function () { | ||
var pos = this.pos + 1; // Skip first double quote. | ||
var str = this._str; | ||
var c; | ||
while ((c = str.charAt(pos))) { | ||
if (c === '"') { | ||
// The spec doesn't explicitly say so, but IDLs likely only | ||
// allow double quotes for strings (C- and Java-style). | ||
return pos + 1; | ||
/** Find end of a string. */ | ||
_endOfString () { | ||
let pos = this.pos + 1; // Skip first double quote. | ||
let str = this._str; | ||
let c; | ||
while ((c = str.charAt(pos))) { | ||
if (c === '"') { | ||
// The spec doesn't explicitly say so, but IDLs likely only | ||
// allow double quotes for strings (C- and Java-style). | ||
return pos + 1; | ||
} | ||
if (c === '\\') { | ||
pos += 2; | ||
} else { | ||
pos++; | ||
} | ||
} | ||
if (c === '\\') { | ||
pos += 2; | ||
} else { | ||
pos++; | ||
} | ||
throw this.error('unterminated string', pos - 1); | ||
} | ||
throw this.error('unterminated string', pos - 1); | ||
}; | ||
/** Find end of JSON object, throwing an error if the end is reached first. */ | ||
Tokenizer.prototype._endOfJson = function () { | ||
var pos = utils.jsonEnd(this._str, this.pos); | ||
if (pos < 0) { | ||
throw this.error('invalid JSON', pos); | ||
/** Find end of JSON object, throwing an error if the end is reached first. */ | ||
_endOfJson () { | ||
let pos = utils.jsonEnd(this._str, this.pos); | ||
if (pos < 0) { | ||
throw this.error('invalid JSON', pos); | ||
} | ||
return pos; | ||
} | ||
return pos; | ||
}; | ||
} | ||
@@ -723,11 +767,11 @@ /** | ||
function extractJavadoc(str) { | ||
var lines = str | ||
let lines = str | ||
.replace(/^[ \t]+|[ \t]+$/g, '') // Trim whitespace. | ||
.split('\n').map(function (line, i) { | ||
.split('\n').map((line, i) => { | ||
return i ? line.replace(/^\s*\*\s?/, '') : line; | ||
}); | ||
while (!lines[0]) { | ||
while (lines.length && !lines[0]) { | ||
lines.shift(); | ||
} | ||
while (!lines[lines.length - 1]) { | ||
while (lines.length && !lines[lines.length - 1]) { | ||
lines.pop(); | ||
@@ -738,9 +782,18 @@ } | ||
/** Returns the namespace generated by a protocol. */ | ||
function protocolNamespace(protocol) { | ||
if (protocol.namespace) { | ||
return protocol.namespace; | ||
} | ||
let match = /^(.*)\.[^.]+$/.exec(protocol.protocol); | ||
return match ? match[1] : undefined; | ||
} | ||
module.exports = { | ||
Tokenizer: Tokenizer, | ||
assembleProtocol: assembleProtocol, | ||
read: read, | ||
Tokenizer, | ||
assembleProtocol, | ||
read, | ||
readProtocol: Reader.readProtocol, | ||
readSchema: Reader.readSchema | ||
}; |
1421
lib/utils.js
@@ -1,3 +0,1 @@ | ||
/* jshint node: true */ | ||
// TODO: Make long comparison impervious to precision loss. | ||
@@ -10,35 +8,12 @@ // TODO: Optimize binary comparison methods. | ||
var crypto = require('crypto'); | ||
let platform = require('./platform'); | ||
// Shared buffer pool for all taps. | ||
var POOL = new BufferPool(4096); | ||
// Valid (field, type, and symbol) name regex. | ||
const NAME_PATTERN = /^[A-Za-z_][A-Za-z0-9_]*$/; | ||
/** | ||
* Create a new empty buffer. | ||
* | ||
* @param size {Number} The buffer's size. | ||
*/ | ||
function newBuffer(size) { | ||
if (typeof Buffer.alloc == 'function') { | ||
return Buffer.alloc(size); | ||
} else { | ||
return new Buffer(size); | ||
} | ||
function isBufferLike(data) { | ||
return (data instanceof Uint8Array); | ||
} | ||
/** | ||
* Create a new buffer with the input contents. | ||
* | ||
* @param data {Array|String} The buffer's data. | ||
* @param enc {String} Encoding, used if data is a string. | ||
*/ | ||
function bufferFrom(data, enc) { | ||
if (typeof Buffer.from == 'function') { | ||
return Buffer.from(data, enc); | ||
} else { | ||
return new Buffer(data, enc); | ||
} | ||
} | ||
/** | ||
* Uppercase the first letter of a string. | ||
@@ -58,2 +33,29 @@ * | ||
let bufCompare, bufEqual; | ||
if (typeof Buffer == 'function') { | ||
bufCompare = Buffer.compare; | ||
bufEqual = function(buf1, buf2) { | ||
return Buffer.prototype.equals.call(buf1, buf2); | ||
}; | ||
} else { | ||
bufCompare = function(buf1, buf2) { | ||
if (buf1 === buf2) { | ||
return 0; | ||
} | ||
let len = Math.min(buf1.length, buf2.length); | ||
for (let i = 0; i < len; i++) { | ||
if (buf1[i] !== buf2[i]) { | ||
return Math.sign(buf1[i] - buf2[i]); | ||
} | ||
} | ||
return Math.sign(buf1.length - buf2.length); | ||
}; | ||
bufEqual = function(buf1, buf2) { | ||
if (buf1.length !== buf2.length) { | ||
return false; | ||
} | ||
return bufCompare(buf1, buf2) === 0; | ||
}; | ||
} | ||
/** | ||
@@ -70,6 +72,3 @@ * Get option or default if undefined. | ||
function getOption(opts, key, def) { | ||
if (opts === undefined) { | ||
return def; | ||
} | ||
var value = opts[key]; | ||
let value = opts[key]; | ||
return value === undefined ? def : value; | ||
@@ -79,15 +78,2 @@ } | ||
/** | ||
* Compute a string's hash. | ||
* | ||
* @param str {String} The string to hash. | ||
* @param algorithm {String} The algorithm used. Defaults to MD5. | ||
*/ | ||
function getHash(str, algorithm) { | ||
algorithm = algorithm || 'md5'; | ||
var hash = crypto.createHash(algorithm); | ||
hash.end(str); | ||
return hash.read(); | ||
} | ||
/** | ||
* Find index of value in array. | ||
@@ -101,8 +87,7 @@ * | ||
function singleIndexOf(arr, v) { | ||
var pos = -1; | ||
var i, l; | ||
let pos = -1; | ||
if (!arr) { | ||
return -1; | ||
} | ||
for (i = 0, l = arr.length; i < l; i++) { | ||
for (let i = 0, l = arr.length; i < l; i++) { | ||
if (arr[i] === v) { | ||
@@ -125,6 +110,5 @@ if (pos >= 0) { | ||
function toMap(arr, fn) { | ||
var obj = {}; | ||
var i, elem; | ||
for (i = 0; i < arr.length; i++) { | ||
elem = arr[i]; | ||
let obj = {}; | ||
for (let i = 0; i < arr.length; i++) { | ||
let elem = arr[i]; | ||
obj[fn(elem)] = elem; | ||
@@ -141,3 +125,3 @@ } | ||
function objectValues(obj) { | ||
return Object.keys(obj).map(function (key) { return obj[key]; }); | ||
return Object.keys(obj).map((key) => { return obj[key]; }); | ||
} | ||
@@ -152,6 +136,5 @@ | ||
function hasDuplicates(arr, fn) { | ||
var obj = {}; | ||
var i, l, elem; | ||
for (i = 0, l = arr.length; i < l; i++) { | ||
elem = arr[i]; | ||
let obj = Object.create(null); | ||
for (let i = 0, l = arr.length; i < l; i++) { | ||
let elem = arr[i]; | ||
if (fn) { | ||
@@ -177,11 +160,7 @@ elem = fn(elem); | ||
function copyOwnProperties(src, dst, overwrite) { | ||
if (!src) { | ||
return dst; | ||
} | ||
var names = Object.getOwnPropertyNames(src); | ||
var i, l, name; | ||
for (i = 0, l = names.length; i < l; i++) { | ||
name = names[i]; | ||
if (!dst.hasOwnProperty(name) || overwrite) { | ||
var descriptor = Object.getOwnPropertyDescriptor(src, name); | ||
let names = Object.getOwnPropertyNames(src); | ||
for (let i = 0, l = names.length; i < l; i++) { | ||
let name = names[i]; | ||
if (!Object.prototype.hasOwnProperty.call(dst, name) || overwrite) { | ||
let descriptor = Object.getOwnPropertyDescriptor(src, name); | ||
Object.defineProperty(dst, name, descriptor); | ||
@@ -194,2 +173,49 @@ } | ||
/** | ||
* Check whether a string is a valid Avro identifier. | ||
*/ | ||
function isValidName(str) { return NAME_PATTERN.test(str); } | ||
/** | ||
* Verify and return fully qualified name. | ||
* | ||
* @param name {String} Full or short name. It can be prefixed with a dot to | ||
* force global namespace. | ||
* @param namespace {String} Optional namespace. | ||
*/ | ||
function qualify(name, namespace) { | ||
if (~name.indexOf('.')) { | ||
name = name.replace(/^\./, ''); // Allow absolute referencing. | ||
} else if (namespace) { | ||
name = namespace + '.' + name; | ||
} | ||
name.split('.').forEach((part) => { | ||
if (!isValidName(part)) { | ||
throw new Error(`invalid name: ${printJSON(name)}`); | ||
} | ||
}); | ||
return name; | ||
} | ||
/** | ||
* Remove namespace from a name. | ||
* | ||
* @param name {String} Full or short name. | ||
*/ | ||
function unqualify(name) { | ||
let parts = name.split('.'); | ||
return parts[parts.length - 1]; | ||
} | ||
/** | ||
* Return the namespace implied by a name. | ||
* | ||
* @param name {String} Full or short name. If short, the returned namespace | ||
* will be empty. | ||
*/ | ||
function impliedNamespace(name) { | ||
let match = /^(.*)\.[^.]+$/.exec(name); | ||
return match ? match[1] : undefined; | ||
} | ||
/** | ||
* Returns offset in the string of the end of JSON object (-1 if past the end). | ||
@@ -209,3 +235,3 @@ * | ||
// Handle the case of a simple literal separately. | ||
var c = str.charAt(pos++); | ||
let c = str.charAt(pos++); | ||
if (/[\d-]/.test(c)) { | ||
@@ -223,24 +249,24 @@ while (/[eE\d.+-]/.test(str.charAt(pos))) { | ||
// String, object, or array. | ||
var depth = 0; | ||
var literal = false; | ||
let depth = 0; | ||
let literal = false; | ||
do { | ||
switch (c) { | ||
case '{': | ||
case '[': | ||
if (!literal) { depth++; } | ||
break; | ||
case '}': | ||
case ']': | ||
if (!literal && !--depth) { | ||
return pos; | ||
} | ||
break; | ||
case '"': | ||
literal = !literal; | ||
if (!depth && !literal) { | ||
return pos; | ||
} | ||
break; | ||
case '\\': | ||
pos++; // Skip the next character. | ||
case '{': | ||
case '[': | ||
if (!literal) { depth++; } | ||
break; | ||
case '}': | ||
case ']': | ||
if (!literal && !--depth) { | ||
return pos; | ||
} | ||
break; | ||
case '"': | ||
literal = !literal; | ||
if (!depth && !literal) { | ||
return pos; | ||
} | ||
break; | ||
case '\\': | ||
pos++; // Skip the next character. | ||
} | ||
@@ -256,25 +282,2 @@ } while ((c = str.charAt(pos++))); | ||
/** | ||
* Simple buffer pool to avoid allocating many small buffers. | ||
* | ||
* This provides significant speedups in recent versions of node (6+). | ||
*/ | ||
function BufferPool(len) { | ||
this._len = len | 0; | ||
this._pos = 0; | ||
this._slab = newBuffer(this._len); | ||
} | ||
BufferPool.prototype.alloc = function (len) { | ||
var maxLen = this._len; | ||
if (len > maxLen) { | ||
return newBuffer(len); | ||
} | ||
if (this._pos + len > maxLen) { | ||
this._slab = newBuffer(maxLen); | ||
this._pos = 0; | ||
} | ||
return this._slab.slice(this._pos, this._pos += len); | ||
}; | ||
/** | ||
* Generator of random things. | ||
@@ -284,74 +287,77 @@ * | ||
*/ | ||
function Lcg(seed) { | ||
var a = 1103515245; | ||
var c = 12345; | ||
var m = Math.pow(2, 31); | ||
var state = Math.floor(seed || Math.random() * (m - 1)); | ||
class Lcg { | ||
constructor (seed) { | ||
let a = 1103515245; | ||
let c = 12345; | ||
let m = Math.pow(2, 31); | ||
let state = Math.floor(seed || Math.random() * (m - 1)); | ||
this._max = m; | ||
this._nextInt = function () { return state = (a * state + c) % m; }; | ||
} | ||
this._max = m; | ||
this._nextInt = function () { | ||
state = (a * state + c) % m; | ||
return state; | ||
}; | ||
} | ||
Lcg.prototype.nextBoolean = function () { | ||
// jshint -W018 | ||
return !!(this._nextInt() % 2); | ||
}; | ||
nextBoolean () { | ||
return !!(this._nextInt() % 2); | ||
} | ||
Lcg.prototype.nextInt = function (start, end) { | ||
if (end === undefined) { | ||
end = start; | ||
start = 0; | ||
nextInt (start, end) { | ||
if (end === undefined) { | ||
end = start; | ||
start = 0; | ||
} | ||
end = end === undefined ? this._max : end; | ||
return start + Math.floor(this.nextFloat() * (end - start)); | ||
} | ||
end = end === undefined ? this._max : end; | ||
return start + Math.floor(this.nextFloat() * (end - start)); | ||
}; | ||
Lcg.prototype.nextFloat = function (start, end) { | ||
if (end === undefined) { | ||
end = start; | ||
start = 0; | ||
nextFloat (start, end) { | ||
if (end === undefined) { | ||
end = start; | ||
start = 0; | ||
} | ||
end = end === undefined ? 1 : end; | ||
return start + (end - start) * this._nextInt() / this._max; | ||
} | ||
end = end === undefined ? 1 : end; | ||
return start + (end - start) * this._nextInt() / this._max; | ||
}; | ||
Lcg.prototype.nextString = function(len, flags) { | ||
len |= 0; | ||
flags = flags || 'aA'; | ||
var mask = ''; | ||
if (flags.indexOf('a') > -1) { | ||
mask += 'abcdefghijklmnopqrstuvwxyz'; | ||
nextString(len, flags) { | ||
len |= 0; | ||
flags = flags || 'aA'; | ||
let mask = ''; | ||
if (flags.indexOf('a') > -1) { | ||
mask += 'abcdefghijklmnopqrstuvwxyz'; | ||
} | ||
if (flags.indexOf('A') > -1) { | ||
mask += 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'; | ||
} | ||
if (flags.indexOf('#') > -1) { | ||
mask += '0123456789'; | ||
} | ||
if (flags.indexOf('!') > -1) { | ||
mask += '~`!@#$%^&*()_+-={}[]:";\'<>?,./|\\'; | ||
} | ||
let result = []; | ||
for (let i = 0; i < len; i++) { | ||
result.push(this.choice(mask)); | ||
} | ||
return result.join(''); | ||
} | ||
if (flags.indexOf('A') > -1) { | ||
mask += 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'; | ||
} | ||
if (flags.indexOf('#') > -1) { | ||
mask += '0123456789'; | ||
} | ||
if (flags.indexOf('!') > -1) { | ||
mask += '~`!@#$%^&*()_+-={}[]:";\'<>?,./|\\'; | ||
} | ||
var result = []; | ||
for (var i = 0; i < len; i++) { | ||
result.push(this.choice(mask)); | ||
} | ||
return result.join(''); | ||
}; | ||
Lcg.prototype.nextBuffer = function (len) { | ||
var arr = []; | ||
var i; | ||
for (i = 0; i < len; i++) { | ||
arr.push(this.nextInt(256)); | ||
nextBuffer (len) { | ||
let arr = new Uint8Array(len); | ||
for (let i = 0; i < len; i++) { | ||
arr[i] = this.nextInt(256); | ||
} | ||
return arr; | ||
} | ||
return bufferFrom(arr); | ||
}; | ||
Lcg.prototype.choice = function (arr) { | ||
var len = arr.length; | ||
if (!len) { | ||
throw new Error('choosing from empty array'); | ||
choice (arr) { | ||
let len = arr.length; | ||
if (!len) { | ||
throw new Error('choosing from empty array'); | ||
} | ||
return arr[this.nextInt(len)]; | ||
} | ||
return arr[this.nextInt(len)]; | ||
}; | ||
} | ||
@@ -364,59 +370,164 @@ /** | ||
*/ | ||
function OrderedQueue() { | ||
this._index = 0; | ||
this._items = []; | ||
} | ||
class OrderedQueue { | ||
constructor () { | ||
this._index = 0; | ||
this._items = []; | ||
} | ||
OrderedQueue.prototype.push = function (item) { | ||
var items = this._items; | ||
var i = items.length | 0; | ||
var j; | ||
items.push(item); | ||
while (i > 0 && items[i].index < items[j = ((i - 1) >> 1)].index) { | ||
item = items[i]; | ||
items[i] = items[j]; | ||
items[j] = item; | ||
i = j; | ||
push (item) { | ||
let items = this._items; | ||
let i = items.length | 0; | ||
let j; | ||
items.push(item); | ||
while (i > 0 && items[i].index < items[j = ((i - 1) >> 1)].index) { | ||
item = items[i]; | ||
items[i] = items[j]; | ||
items[j] = item; | ||
i = j; | ||
} | ||
} | ||
}; | ||
OrderedQueue.prototype.pop = function () { | ||
var items = this._items; | ||
var len = (items.length - 1) | 0; | ||
var first = items[0]; | ||
if (!first || first.index > this._index) { | ||
return null; | ||
} | ||
this._index++; | ||
if (!len) { | ||
items.pop(); | ||
pop () { | ||
let items = this._items; | ||
let len = (items.length - 1) | 0; | ||
let first = items[0]; | ||
if (!first || first.index > this._index) { | ||
return null; | ||
} | ||
this._index++; | ||
if (!len) { | ||
items.pop(); | ||
return first; | ||
} | ||
items[0] = items.pop(); | ||
let mid = len >> 1; | ||
let i = 0; | ||
let i1, i2, j, item, c, c1, c2; | ||
while (i < mid) { | ||
item = items[i]; | ||
i1 = (i << 1) + 1; | ||
i2 = (i + 1) << 1; | ||
c1 = items[i1]; | ||
c2 = items[i2]; | ||
if (!c2 || c1.index <= c2.index) { | ||
c = c1; | ||
j = i1; | ||
} else { | ||
c = c2; | ||
j = i2; | ||
} | ||
if (c.index >= item.index) { | ||
break; | ||
} | ||
items[j] = item; | ||
items[i] = c; | ||
i = j; | ||
} | ||
return first; | ||
} | ||
items[0] = items.pop(); | ||
var mid = len >> 1; | ||
var i = 0; | ||
var i1, i2, j, item, c, c1, c2; | ||
while (i < mid) { | ||
item = items[i]; | ||
i1 = (i << 1) + 1; | ||
i2 = (i + 1) << 1; | ||
c1 = items[i1]; | ||
c2 = items[i2]; | ||
if (!c2 || c1.index <= c2.index) { | ||
c = c1; | ||
j = i1; | ||
} else { | ||
c = c2; | ||
j = i2; | ||
} | ||
let decodeSlice; | ||
if (typeof Buffer === 'function' && typeof Buffer.prototype.utf8Slice === 'function') { | ||
// Note that calling `Buffer.prototype.toString.call(buf, 'utf-8')` on a | ||
// `Uint8Array` throws because Node's internal implementation expects the | ||
// argument to be a `Buffer` specifically. | ||
decodeSlice = Function.prototype.call.bind(Buffer.prototype.utf8Slice); | ||
} else { | ||
const DECODER = new TextDecoder(); | ||
decodeSlice = function(arr, start, end) { | ||
return DECODER.decode(arr.subarray(start, end)); | ||
}; | ||
} | ||
const ENCODER = new TextEncoder(); | ||
const encodeBuf = new Uint8Array(4096); | ||
const encodeBufs = []; | ||
function encodeSlice(str) { | ||
const {read, written} = ENCODER.encodeInto(str, encodeBuf); | ||
if (read === str.length) { | ||
// Believe it or not, `subarray` is actually quite expensive. To avoid the | ||
// cost, we cache and reuse `subarray`s. | ||
if (!encodeBufs[written]) { | ||
encodeBufs[written] = encodeBuf.subarray(0, written); | ||
} | ||
if (c.index >= item.index) { | ||
break; | ||
} | ||
items[j] = item; | ||
items[i] = c; | ||
i = j; | ||
return encodeBufs[written]; | ||
} | ||
return first; | ||
}; | ||
return ENCODER.encode(str); | ||
} | ||
let utf8Length; | ||
if (typeof Buffer === 'function') { | ||
utf8Length = Buffer.byteLength; | ||
} else { | ||
utf8Length = function(str) { | ||
let len = 0; | ||
for (;;) { | ||
// encodeInto is faster than any manual implementation (or even | ||
// Buffer.byteLength), provided the string fits entirely within the | ||
// buffer. Past that, it slows down but is still faster than other | ||
// options. | ||
const {read, written} = ENCODER.encodeInto(str, encodeBuf); | ||
len += written; | ||
if (read === str.length) break; | ||
str = str.slice(read); | ||
} | ||
return len; | ||
}; | ||
} | ||
let bufferToBinaryString; | ||
if (typeof Buffer === 'function' && typeof Buffer.prototype.latin1Slice === 'function') { | ||
// Note that calling `Buffer.prototype.toString.call(buf, 'binary')` on a | ||
// `Uint8Array` throws because Node's internal implementation expects the | ||
// argument to be a `Buffer` specifically. | ||
bufferToBinaryString = Function.prototype.call.bind( | ||
Buffer.prototype.latin1Slice); | ||
} else { | ||
bufferToBinaryString = function(buf) { | ||
let str = ''; | ||
let i = 0, len = buf.length; | ||
for (; i + 7 < len; i += 8) { | ||
str += String.fromCharCode( | ||
buf[i], | ||
buf[i + 1], | ||
buf[i + 2], | ||
buf[i + 3], | ||
buf[i + 4], | ||
buf[i + 5], | ||
buf[i + 6], | ||
buf[i + 7] | ||
); | ||
} | ||
for (; i < len; i++) { | ||
str += String.fromCharCode(buf[i]); | ||
} | ||
return str; | ||
}; | ||
} | ||
let binaryStringToBuffer; | ||
if (typeof Buffer === 'function') { | ||
binaryStringToBuffer = function(str) { | ||
let buf = Buffer.from(str, 'binary'); | ||
return new Uint8Array(buf.buffer, buf.byteOffset, buf.length); | ||
}; | ||
} else { | ||
binaryStringToBuffer = function(str) { | ||
let buf = new Uint8Array(str.length); | ||
for (let i = 0; i < str.length; i++) { | ||
buf[i] = str.charCodeAt(i); | ||
} | ||
return Buffer.from(buf); | ||
}; | ||
} | ||
// Having multiple views into the same buffer seems to massively decrease read | ||
// performance. To read and write float and double types, copy them to and from | ||
// this data view instead. | ||
const FLOAT_VIEW = new DataView(new ArrayBuffer(8)); | ||
/** | ||
@@ -430,422 +541,539 @@ * A tap is a buffer which remembers what has been already read. | ||
*/ | ||
function Tap(buf, pos) { | ||
this.buf = buf; | ||
this.pos = pos | 0; | ||
if (this.pos < 0) { | ||
throw new Error('negative offset'); | ||
class Tap { | ||
constructor (buf, pos) { | ||
this.setData(buf, pos); | ||
} | ||
} | ||
/** | ||
* Check that the tap is in a valid state. | ||
* | ||
* For efficiency reasons, none of the methods below will fail if an overflow | ||
* occurs (either read, skip, or write). For this reason, it is up to the | ||
* caller to always check that the read, skip, or write was valid by calling | ||
* this method. | ||
*/ | ||
Tap.prototype.isValid = function () { return this.pos <= this.buf.length; }; | ||
setData (buf, pos) { | ||
if (typeof Buffer === 'function' && buf instanceof Buffer) { | ||
buf = new Uint8Array(buf.buffer, buf.byteOffset, buf.length); | ||
} | ||
this.arr = buf; | ||
this.pos = pos | 0; | ||
if (this.pos < 0) { | ||
throw new Error('negative offset'); | ||
} | ||
} | ||
// Read, skip, write methods. | ||
// | ||
// These should fail silently when the buffer overflows. Note this is only | ||
// required to be true when the functions are decoding valid objects. For | ||
// example errors will still be thrown if a bad count is read, leading to a | ||
// negative position offset (which will typically cause a failure in | ||
// `readFixed`). | ||
get length() { | ||
return this.arr.length; | ||
} | ||
Tap.prototype.readBoolean = function () { return !!this.buf[this.pos++]; }; | ||
reinitialize (capacity) { | ||
this.setData(new Uint8Array(capacity)); | ||
} | ||
Tap.prototype.skipBoolean = function () { this.pos++; }; | ||
static fromBuffer (buf, pos) { | ||
return new Tap(buf, pos); | ||
} | ||
Tap.prototype.writeBoolean = function (b) { this.buf[this.pos++] = !!b; }; | ||
static withCapacity (capacity) { | ||
let buf = new Uint8Array(capacity); | ||
return new Tap(buf); | ||
} | ||
Tap.prototype.readInt = Tap.prototype.readLong = function () { | ||
var n = 0; | ||
var k = 0; | ||
var buf = this.buf; | ||
var b, h, f, fk; | ||
toBuffer () { | ||
return this.arr.slice(0, this.pos); | ||
} | ||
do { | ||
b = buf[this.pos++]; | ||
h = b & 0x80; | ||
n |= (b & 0x7f) << k; | ||
k += 7; | ||
} while (h && k < 28); | ||
subarray (start, end) { | ||
return this.arr.subarray(start, end); | ||
} | ||
if (h) { | ||
// Switch to float arithmetic, otherwise we might overflow. | ||
f = n; | ||
fk = 268435456; // 2 ** 28. | ||
do { | ||
b = buf[this.pos++]; | ||
f += (b & 0x7f) * fk; | ||
fk *= 128; | ||
} while (b & 0x80); | ||
return (f % 2 ? -(f + 1) : f) / 2; | ||
append (newBuf) { | ||
const newArr = new Uint8Array(this.arr.length + newBuf.length); | ||
newArr.set(this.arr, 0); | ||
newArr.set(newBuf, this.arr.length); | ||
this.setData(newArr, 0); | ||
} | ||
return (n >> 1) ^ -(n & 1); | ||
}; | ||
forward (newBuf) { | ||
const subArr = this.arr.subarray(this.pos); | ||
const newArr = new Uint8Array(subArr.length + newBuf.length); | ||
newArr.set(subArr, 0); | ||
newArr.set(newBuf, subArr.length); | ||
this.setData(newArr, 0); | ||
} | ||
Tap.prototype.skipInt = Tap.prototype.skipLong = function () { | ||
var buf = this.buf; | ||
while (buf[this.pos++] & 0x80) {} | ||
}; | ||
/** | ||
* Check that the tap is in a valid state. | ||
* | ||
* For efficiency reasons, none of the methods below will fail if an overflow | ||
* occurs (either read, skip, or write). For this reason, it is up to the | ||
* caller to always check that the read, skip, or write was valid by calling | ||
* this method. | ||
*/ | ||
isValid () { return this.pos <= this.arr.length; } | ||
Tap.prototype.writeInt = Tap.prototype.writeLong = function (n) { | ||
var buf = this.buf; | ||
var f, m; | ||
_invalidate () { this.pos = this.arr.length + 1; } | ||
if (n >= -1073741824 && n < 1073741824) { | ||
// Won't overflow, we can use integer arithmetic. | ||
m = n >= 0 ? n << 1 : (~n << 1) | 1; | ||
// Read, skip, write methods. | ||
// | ||
// These should fail silently when the buffer overflows. Note this is only | ||
// required to be true when the functions are decoding valid objects. For | ||
// example errors will still be thrown if a bad count is read, leading to a | ||
// negative position offset (which will typically cause a failure in | ||
// `readFixed`). | ||
readBoolean () { return !!this.arr[this.pos++]; } | ||
skipBoolean () { this.pos++; } | ||
writeBoolean (b) { this.arr[this.pos++] = !!b; } | ||
readLong () { | ||
let n = 0; | ||
let k = 0; | ||
let buf = this.arr; | ||
let b, h, f, fk; | ||
do { | ||
buf[this.pos] = m & 0x7f; | ||
m >>= 7; | ||
} while (m && (buf[this.pos++] |= 0x80)); | ||
} else { | ||
// We have to use slower floating arithmetic. | ||
f = n >= 0 ? n * 2 : (-n * 2) - 1; | ||
do { | ||
buf[this.pos] = f & 0x7f; | ||
f /= 128; | ||
} while (f >= 1 && (buf[this.pos++] |= 0x80)); | ||
b = buf[this.pos++]; | ||
h = b & 0x80; | ||
n |= (b & 0x7f) << k; | ||
k += 7; | ||
} while (h && k < 28); | ||
if (h) { | ||
// Switch to float arithmetic, otherwise we might overflow. | ||
f = n; | ||
fk = 268435456; // 2 ** 28. | ||
do { | ||
b = buf[this.pos++]; | ||
f += (b & 0x7f) * fk; | ||
fk *= 128; | ||
} while (b & 0x80); | ||
return (f % 2 ? -(f + 1) : f) / 2; | ||
} | ||
return (n >> 1) ^ -(n & 1); | ||
} | ||
this.pos++; | ||
}; | ||
Tap.prototype.readFloat = function () { | ||
var buf = this.buf; | ||
var pos = this.pos; | ||
this.pos += 4; | ||
if (this.pos > buf.length) { | ||
return; | ||
skipLong () { | ||
let buf = this.arr; | ||
while (buf[this.pos++] & 0x80) {} | ||
} | ||
return this.buf.readFloatLE(pos); | ||
}; | ||
Tap.prototype.skipFloat = function () { this.pos += 4; }; | ||
writeLong (n) { | ||
let buf = this.arr; | ||
let f, m; | ||
Tap.prototype.writeFloat = function (f) { | ||
var buf = this.buf; | ||
var pos = this.pos; | ||
this.pos += 4; | ||
if (this.pos > buf.length) { | ||
return; | ||
if (n >= -1073741824 && n < 1073741824) { | ||
// Won't overflow, we can use integer arithmetic. | ||
m = n >= 0 ? n << 1 : (~n << 1) | 1; | ||
do { | ||
buf[this.pos] = m & 0x7f; | ||
m >>= 7; | ||
} while (m && (buf[this.pos++] |= 0x80)); | ||
} else { | ||
// We have to use slower floating arithmetic. | ||
f = n >= 0 ? n * 2 : (-n * 2) - 1; | ||
do { | ||
buf[this.pos] = f & 0x7f; | ||
f /= 128; | ||
} while (f >= 1 && (buf[this.pos++] |= 0x80)); | ||
} | ||
this.pos++; | ||
} | ||
return this.buf.writeFloatLE(f, pos); | ||
}; | ||
Tap.prototype.readDouble = function () { | ||
var buf = this.buf; | ||
var pos = this.pos; | ||
this.pos += 8; | ||
if (this.pos > buf.length) { | ||
return; | ||
readFloat () { | ||
let pos = this.pos; | ||
this.pos += 4; | ||
if (this.pos > this.arr.length) { | ||
return 0; | ||
} | ||
FLOAT_VIEW.setUint32( | ||
0, | ||
this.arr[pos] | | ||
(this.arr[pos + 1] << 8) | | ||
(this.arr[pos + 2] << 16) | | ||
(this.arr[pos + 3] << 24), | ||
true); | ||
return FLOAT_VIEW.getFloat32(0, true); | ||
} | ||
return this.buf.readDoubleLE(pos); | ||
}; | ||
Tap.prototype.skipDouble = function () { this.pos += 8; }; | ||
skipFloat () { this.pos += 4; } | ||
Tap.prototype.writeDouble = function (d) { | ||
var buf = this.buf; | ||
var pos = this.pos; | ||
this.pos += 8; | ||
if (this.pos > buf.length) { | ||
return; | ||
writeFloat (f) { | ||
let pos = this.pos; | ||
this.pos += 4; | ||
if (this.pos > this.arr.length) { | ||
return; | ||
} | ||
FLOAT_VIEW.setFloat32(0, f, true); | ||
const n = FLOAT_VIEW.getUint32(0, true); | ||
this.arr[pos] = n & 0xff; | ||
this.arr[pos + 1] = (n >> 8) & 0xff; | ||
this.arr[pos + 2] = (n >> 16) & 0xff; | ||
this.arr[pos + 3] = n >> 24; | ||
} | ||
return this.buf.writeDoubleLE(d, pos); | ||
}; | ||
Tap.prototype.readFixed = function (len) { | ||
var pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > this.buf.length) { | ||
return; | ||
readDouble () { | ||
let pos = this.pos; | ||
this.pos += 8; | ||
if (this.pos > this.arr.length) { | ||
return 0; | ||
} | ||
FLOAT_VIEW.setUint32( | ||
0, | ||
this.arr[pos] | | ||
(this.arr[pos + 1] << 8) | | ||
(this.arr[pos + 2] << 16) | | ||
(this.arr[pos + 3] << 24), | ||
true | ||
); | ||
FLOAT_VIEW.setUint32( | ||
4, | ||
this.arr[pos + 4] | | ||
(this.arr[pos + 5] << 8) | | ||
(this.arr[pos + 6] << 16) | | ||
(this.arr[pos + 7] << 24), | ||
true | ||
); | ||
return FLOAT_VIEW.getFloat64(0, true); | ||
} | ||
var fixed = POOL.alloc(len); | ||
this.buf.copy(fixed, 0, pos, pos + len); | ||
return fixed; | ||
}; | ||
Tap.prototype.skipFixed = function (len) { this.pos += len; }; | ||
skipDouble () { this.pos += 8; } | ||
Tap.prototype.writeFixed = function (buf, len) { | ||
len = len || buf.length; | ||
var pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > this.buf.length) { | ||
return; | ||
writeDouble (d) { | ||
let pos = this.pos; | ||
this.pos += 8; | ||
if (this.pos > this.arr.length) { | ||
return; | ||
} | ||
FLOAT_VIEW.setFloat64(0, d, true); | ||
const a = FLOAT_VIEW.getUint32(0, true); | ||
const b = FLOAT_VIEW.getUint32(4, true); | ||
this.arr[pos] = a & 0xff; | ||
this.arr[pos + 1] = (a >> 8) & 0xff; | ||
this.arr[pos + 2] = (a >> 16) & 0xff; | ||
this.arr[pos + 3] = a >> 24; | ||
this.arr[pos + 4] = b & 0xff; | ||
this.arr[pos + 5] = (b >> 8) & 0xff; | ||
this.arr[pos + 6] = (b >> 16) & 0xff; | ||
this.arr[pos + 7] = b >> 24; | ||
} | ||
buf.copy(this.buf, pos, 0, len); | ||
}; | ||
Tap.prototype.readBytes = function () { | ||
return this.readFixed(this.readLong()); | ||
}; | ||
readFixed (len) { | ||
let pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > this.arr.length) { | ||
return; | ||
} | ||
return this.arr.slice(pos, pos + len); | ||
} | ||
Tap.prototype.skipBytes = function () { | ||
var len = this.readLong(); | ||
this.pos += len; | ||
}; | ||
skipFixed (len) { this.pos += len; } | ||
Tap.prototype.writeBytes = function (buf) { | ||
var len = buf.length; | ||
this.writeLong(len); | ||
this.writeFixed(buf, len); | ||
}; | ||
/* istanbul ignore else */ | ||
if (typeof Buffer.prototype.utf8Slice == 'function') { | ||
// Use this optimized function when available. | ||
Tap.prototype.readString = function () { | ||
var len = this.readLong(); | ||
var pos = this.pos; | ||
var buf = this.buf; | ||
writeFixed (buf, len) { | ||
len = len || buf.length; | ||
let pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > buf.length) { | ||
if (this.pos > this.arr.length) { | ||
return; | ||
} | ||
return this.buf.utf8Slice(pos, pos + len); | ||
}; | ||
} else { | ||
Tap.prototype.readString = function () { | ||
var len = this.readLong(); | ||
var pos = this.pos; | ||
var buf = this.buf; | ||
this.pos += len; | ||
if (this.pos > buf.length) { | ||
this.arr.set(buf.subarray(0, len), pos); | ||
} | ||
readBytes () { | ||
let len = this.readLong(); | ||
if (len < 0) { | ||
this._invalidate(); | ||
return; | ||
} | ||
return this.buf.slice(pos, pos + len).toString(); | ||
}; | ||
} | ||
return this.readFixed(len); | ||
} | ||
Tap.prototype.skipString = function () { | ||
var len = this.readLong(); | ||
this.pos += len; | ||
}; | ||
Tap.prototype.writeString = function (s) { | ||
var len = Buffer.byteLength(s); | ||
var buf = this.buf; | ||
this.writeLong(len); | ||
var pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > buf.length) { | ||
return; | ||
} | ||
if (len > 64) { | ||
this._writeUtf8(s, len); | ||
} else { | ||
var i, l, c1, c2; | ||
for (i = 0, l = len; i < l; i++) { | ||
c1 = s.charCodeAt(i); | ||
if (c1 < 0x80) { | ||
buf[pos++] = c1; | ||
} else if (c1 < 0x800) { | ||
buf[pos++] = c1 >> 6 | 0xc0; | ||
buf[pos++] = c1 & 0x3f | 0x80; | ||
} else if ( | ||
(c1 & 0xfc00) === 0xd800 && | ||
((c2 = s.charCodeAt(i + 1)) & 0xfc00) === 0xdc00 | ||
) { | ||
c1 = 0x10000 + ((c1 & 0x03ff) << 10) + (c2 & 0x03ff); | ||
i++; | ||
buf[pos++] = c1 >> 18 | 0xf0; | ||
buf[pos++] = c1 >> 12 & 0x3f | 0x80; | ||
buf[pos++] = c1 >> 6 & 0x3f | 0x80; | ||
buf[pos++] = c1 & 0x3f | 0x80; | ||
} else { | ||
buf[pos++] = c1 >> 12 | 0xe0; | ||
buf[pos++] = c1 >> 6 & 0x3f | 0x80; | ||
buf[pos++] = c1 & 0x3f | 0x80; | ||
} | ||
skipBytes () { | ||
let len = this.readLong(); | ||
if (len < 0) { | ||
this._invalidate(); | ||
return; | ||
} | ||
this.pos += len; | ||
} | ||
}; | ||
/* istanbul ignore else */ | ||
if (typeof Buffer.prototype.utf8Write == 'function') { | ||
Tap.prototype._writeUtf8 = function (str, len) { | ||
this.buf.utf8Write(str, this.pos - len, len); | ||
}; | ||
} else { | ||
// `utf8Write` isn't available in the browser. | ||
Tap.prototype._writeUtf8 = function (str, len) { | ||
this.buf.write(str, this.pos - len, len, 'utf8'); | ||
}; | ||
} | ||
writeBytes (buf) { | ||
let len = buf.length; | ||
this.writeLong(len); | ||
this.writeFixed(buf, len); | ||
} | ||
/* istanbul ignore else */ | ||
if (typeof Buffer.prototype.latin1Write == 'function') { | ||
// `binaryWrite` has been renamed to `latin1Write` in Node v6.4.0, see | ||
// https://github.com/nodejs/node/pull/7111. Note that the `'binary'` | ||
// encoding argument still works however. | ||
Tap.prototype.writeBinary = function (str, len) { | ||
var pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > this.buf.length) { | ||
skipString () { | ||
let len = this.readLong(); | ||
if (len < 0) { | ||
this._invalidate(); | ||
return; | ||
} | ||
this.buf.latin1Write(str, pos, len); | ||
}; | ||
} else if (typeof Buffer.prototype.binaryWrite == 'function') { | ||
Tap.prototype.writeBinary = function (str, len) { | ||
var pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > this.buf.length) { | ||
return; | ||
} | ||
readString () { | ||
let len = this.readLong(); | ||
if (len < 0) { | ||
this._invalidate(); | ||
return ''; | ||
} | ||
this.buf.binaryWrite(str, pos, len); | ||
}; | ||
} else { | ||
// Slowest implementation. | ||
Tap.prototype.writeBinary = function (s, len) { | ||
var pos = this.pos; | ||
let pos = this.pos; | ||
this.pos += len; | ||
if (this.pos > this.buf.length) { | ||
if (this.pos > this.arr.length) { | ||
return; | ||
} | ||
this.buf.write(s, pos, len, 'binary'); | ||
}; | ||
} | ||
// Binary comparison methods. | ||
// | ||
// These are not guaranteed to consume the objects they are comparing when | ||
// returning a non-zero result (allowing for performance benefits), so no other | ||
// operations should be done on either tap after a compare returns a non-zero | ||
// value. Also, these methods do not have the same silent failure requirement | ||
// as read, skip, and write since they are assumed to be called on valid | ||
// buffers. | ||
let arr = this.arr; | ||
let end = pos + len; | ||
if (len > 24) { | ||
return decodeSlice(arr, pos, end); | ||
} | ||
Tap.prototype.matchBoolean = function (tap) { | ||
return this.buf[this.pos++] - tap.buf[tap.pos++]; | ||
}; | ||
let output = ''; | ||
// Consume the string in 4-byte chunks. The performance benefit comes not | ||
// from *reading* in chunks, but calling fromCharCode with 4 characters per | ||
// call. | ||
while (pos + 3 < end) { | ||
let a = arr[pos], b = arr[pos + 1], c = arr[pos + 2], d = arr[pos + 3]; | ||
// If the high bit of any character is set, it's a non-ASCII character. | ||
// Fall back to TextDecoder for the remaining characters. | ||
if ((a | b | c | d) & 0x80) { | ||
output += decodeSlice(arr, pos, end); | ||
return output; | ||
} | ||
output += String.fromCharCode(a, b, c, d); | ||
pos += 4; | ||
} | ||
Tap.prototype.matchInt = Tap.prototype.matchLong = function (tap) { | ||
var n1 = this.readLong(); | ||
var n2 = tap.readLong(); | ||
return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); | ||
}; | ||
// Handle the remainder of the string. | ||
while (pos < end) { | ||
let char = arr[pos]; | ||
if (char & 0x80) { | ||
output += decodeSlice(arr, pos, end); | ||
return output; | ||
} | ||
output += String.fromCharCode(char); | ||
pos++; | ||
} | ||
Tap.prototype.matchFloat = function (tap) { | ||
var n1 = this.readFloat(); | ||
var n2 = tap.readFloat(); | ||
return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); | ||
}; | ||
return output; | ||
} | ||
Tap.prototype.matchDouble = function (tap) { | ||
var n1 = this.readDouble(); | ||
var n2 = tap.readDouble(); | ||
return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); | ||
}; | ||
writeString (s) { | ||
let buf = this.arr; | ||
const stringLen = s.length; | ||
// The maximum number that a signed varint can store in a single byte is 63. | ||
// The maximum size of a UTF-8 representation of a UTF-16 string is 3 times | ||
// its length, as one UTF-16 character can be represented by up to 3 bytes | ||
// in UTF-8. Therefore, if the string is 21 characters or less, we know that | ||
// its length can be stored in a single byte, which is why we choose 21 as | ||
// the small-string threshold specifically. | ||
if (stringLen > 21) { | ||
let encodedLength, encoded; | ||
Tap.prototype.matchFixed = function (tap, len) { | ||
return this.readFixed(len).compare(tap.readFixed(len)); | ||
}; | ||
// If we're already over the buffer size, we don't need to encode the | ||
// string. While encodeInto is actually faster than Buffer.byteLength, we | ||
// could still overflow the preallocated encoding buffer and have to fall | ||
// back to allocating, which is really really slow. | ||
if (this.isValid()) { | ||
encoded = encodeSlice(s); | ||
encodedLength = encoded.length; | ||
} else { | ||
encodedLength = utf8Length(s); | ||
} | ||
this.writeLong(encodedLength); | ||
let pos = this.pos; | ||
this.pos += encodedLength; | ||
Tap.prototype.matchBytes = Tap.prototype.matchString = function (tap) { | ||
var l1 = this.readLong(); | ||
var p1 = this.pos; | ||
this.pos += l1; | ||
var l2 = tap.readLong(); | ||
var p2 = tap.pos; | ||
tap.pos += l2; | ||
var b1 = this.buf.slice(p1, this.pos); | ||
var b2 = tap.buf.slice(p2, tap.pos); | ||
return b1.compare(b2); | ||
}; | ||
if (this.isValid() && typeof encoded != 'undefined') { | ||
buf.set(encoded, pos); | ||
} | ||
} else { | ||
// For small strings, this manual implementation is faster. | ||
// Functions for supporting custom long classes. | ||
// | ||
// The two following methods allow the long implementations to not have to | ||
// worry about Avro's zigzag encoding, we directly expose longs as unpacked. | ||
// Set aside 1 byte to write the string length. | ||
let pos = this.pos + 1; | ||
let startPos = pos; | ||
let bufLen = buf.length; | ||
Tap.prototype.unpackLongBytes = function () { | ||
var res = newBuffer(8); | ||
var n = 0; | ||
var i = 0; // Byte index in target buffer. | ||
var j = 6; // Bit offset in current target buffer byte. | ||
var buf = this.buf; | ||
var b, neg; | ||
// This is not a micro-optimization: caching the string length for the | ||
// loop predicate really does make a difference! | ||
for (let i = 0; i < stringLen; i++) { | ||
let c1 = s.charCodeAt(i); | ||
let c2; | ||
if (c1 < 0x80) { | ||
if (pos < bufLen) buf[pos] = c1; | ||
pos++; | ||
} else if (c1 < 0x800) { | ||
if (pos + 1 < bufLen) { | ||
buf[pos] = c1 >> 6 | 0xc0; | ||
buf[pos + 1] = c1 & 0x3f | 0x80; | ||
} | ||
pos += 2; | ||
} else if ( | ||
(c1 & 0xfc00) === 0xd800 && | ||
((c2 = s.charCodeAt(i + 1)) & 0xfc00) === 0xdc00 | ||
) { | ||
c1 = 0x10000 + ((c1 & 0x03ff) << 10) + (c2 & 0x03ff); | ||
i++; | ||
if (pos + 3 < bufLen) { | ||
buf[pos] = c1 >> 18 | 0xf0; | ||
buf[pos + 1] = c1 >> 12 & 0x3f | 0x80; | ||
buf[pos + 2] = c1 >> 6 & 0x3f | 0x80; | ||
buf[pos + 3] = c1 & 0x3f | 0x80; | ||
} | ||
pos += 4; | ||
} else { | ||
if (pos + 2 < bufLen) { | ||
buf[pos] = c1 >> 12 | 0xe0; | ||
buf[pos + 1] = c1 >> 6 & 0x3f | 0x80; | ||
buf[pos + 2] = c1 & 0x3f | 0x80; | ||
} | ||
pos += 3; | ||
} | ||
} | ||
b = buf[this.pos++]; | ||
neg = b & 1; | ||
res.fill(0); | ||
// Note that we've not yet updated this.pos, so it's currently pointing to | ||
// the place where we want to write the string length. | ||
if (this.pos <= bufLen) { | ||
this.writeLong(pos - startPos); | ||
} | ||
n |= (b & 0x7f) >> 1; | ||
while (b & 0x80) { | ||
b = buf[this.pos++]; | ||
n |= (b & 0x7f) << j; | ||
j += 7; | ||
if (j >= 8) { | ||
// Flush byte. | ||
j -= 8; | ||
res[i++] = n; | ||
n >>= 8; | ||
this.pos = pos; | ||
} | ||
} | ||
res[i] = n; | ||
if (neg) { | ||
invert(res, 8); | ||
// Binary comparison methods. | ||
// | ||
// These are not guaranteed to consume the objects they are comparing when | ||
// returning a non-zero result (allowing for performance benefits), so no | ||
// other operations should be done on either tap after a compare returns a | ||
// non-zero value. Also, these methods do not have the same silent failure | ||
// requirement as read, skip, and write since they are assumed to be called on | ||
// valid buffers. | ||
matchBoolean (tap) { | ||
return this.arr[this.pos++] - tap.arr[tap.pos++]; | ||
} | ||
return res; | ||
}; | ||
matchLong (tap) { | ||
let n1 = this.readLong(); | ||
let n2 = tap.readLong(); | ||
return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); | ||
} | ||
Tap.prototype.packLongBytes = function (buf) { | ||
var neg = (buf[7] & 0x80) >> 7; | ||
var res = this.buf; | ||
var j = 1; | ||
var k = 0; | ||
var m = 3; | ||
var n; | ||
matchFloat (tap) { | ||
let n1 = this.readFloat(); | ||
let n2 = tap.readFloat(); | ||
return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); | ||
} | ||
if (neg) { | ||
invert(buf, 8); | ||
n = 1; | ||
} else { | ||
n = 0; | ||
matchDouble (tap) { | ||
let n1 = this.readDouble(); | ||
let n2 = tap.readDouble(); | ||
return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); | ||
} | ||
var parts = [ | ||
buf.readUIntLE(0, 3), | ||
buf.readUIntLE(3, 3), | ||
buf.readUIntLE(6, 2) | ||
]; | ||
// Not reading more than 24 bits because we need to be able to combine the | ||
// "carry" bits from the previous part and JavaScript only supports bitwise | ||
// operations on 32 bit integers. | ||
while (m && !parts[--m]) {} // Skip trailing 0s. | ||
matchFixed (tap, len) { | ||
return bufCompare(this.readFixed(len), tap.readFixed(len)); | ||
} | ||
// Leading parts (if any), we never bail early here since we need the | ||
// continuation bit to be set. | ||
while (k < m) { | ||
n |= parts[k++] << j; | ||
j += 24; | ||
while (j > 7) { | ||
res[this.pos++] = (n & 0x7f) | 0x80; | ||
n >>= 7; | ||
j -= 7; | ||
matchBytes (tap) { | ||
let l1 = this.readLong(); | ||
let p1 = this.pos; | ||
this.pos += l1; | ||
let l2 = tap.readLong(); | ||
let p2 = tap.pos; | ||
tap.pos += l2; | ||
let b1 = this.arr.subarray(p1, this.pos); | ||
let b2 = tap.arr.subarray(p2, tap.pos); | ||
return bufCompare(b1, b2); | ||
} | ||
// Functions for supporting custom long classes. | ||
// | ||
// The two following methods allow the long implementations to not have to | ||
// worry about Avro's zigzag encoding, we directly expose longs as unpacked. | ||
unpackLongBytes () { | ||
let res = new Uint8Array(8); | ||
let n = 0; | ||
let i = 0; // Byte index in target buffer. | ||
let j = 6; // Bit offset in current target buffer byte. | ||
let buf = this.arr; | ||
let b = buf[this.pos++]; | ||
let neg = b & 1; | ||
res.fill(0); | ||
n |= (b & 0x7f) >> 1; | ||
while (b & 0x80) { | ||
b = buf[this.pos++]; | ||
n |= (b & 0x7f) << j; | ||
j += 7; | ||
if (j >= 8) { | ||
// Flush byte. | ||
j -= 8; | ||
res[i++] = n; | ||
n >>= 8; | ||
} | ||
} | ||
res[i] = n; | ||
if (neg) { | ||
invert(res, 8); | ||
} | ||
return res; | ||
} | ||
// Final part, similar to normal packing aside from the initial offset. | ||
n |= parts[m] << j; | ||
do { | ||
res[this.pos] = n & 0x7f; | ||
n >>= 7; | ||
} while (n && (res[this.pos++] |= 0x80)); | ||
this.pos++; | ||
packLongBytes (buf) { | ||
let neg = (buf[7] & 0x80) >> 7; | ||
let res = this.arr; | ||
let j = 1; | ||
let k = 0; | ||
let m = 3; | ||
let n; | ||
// Restore original buffer (could make this optional?). | ||
if (neg) { | ||
invert(buf, 8); | ||
if (neg) { | ||
invert(buf, 8); | ||
n = 1; | ||
} else { | ||
n = 0; | ||
} | ||
let parts = [ | ||
(buf[0] | (buf[1] << 8) | (buf[2] << 16)), | ||
(buf[3] | (buf[4] << 8) | (buf[5] << 16)), | ||
(buf[6] | (buf[7] << 8)) | ||
]; | ||
// Not reading more than 24 bits because we need to be able to combine the | ||
// "carry" bits from the previous part and JavaScript only supports bitwise | ||
// operations on 32 bit integers. | ||
while (m && !parts[--m]) {} // Skip trailing 0s. | ||
// Leading parts (if any), we never bail early here since we need the | ||
// continuation bit to be set. | ||
while (k < m) { | ||
n |= parts[k++] << j; | ||
j += 24; | ||
while (j > 7) { | ||
res[this.pos++] = (n & 0x7f) | 0x80; | ||
n >>= 7; | ||
j -= 7; | ||
} | ||
} | ||
// Final part, similar to normal packing aside from the initial offset. | ||
n |= parts[m] << j; | ||
do { | ||
res[this.pos] = n & 0x7f; | ||
n >>= 7; | ||
} while (n && (res[this.pos++] |= 0x80)); | ||
this.pos++; | ||
// Restore original buffer (could make this optional?). | ||
if (neg) { | ||
invert(buf, 8); | ||
} | ||
} | ||
}; | ||
} | ||
@@ -857,4 +1085,4 @@ // Helpers. | ||
* | ||
* @param buf {Buffer} Non-empty buffer to invert. | ||
* @param len {Number} Buffer length (must be positive). | ||
* @param {Uint8Array} buf Non-empty buffer to invert. | ||
* @param {number} len Buffer length (must be positive). | ||
*/ | ||
@@ -867,20 +1095,49 @@ function invert(buf, len) { | ||
/** | ||
* Prints an object as a string; mostly used for printing objects in errors. | ||
* @param {object} obj The object to display. | ||
* @returns The object as JSON. | ||
*/ | ||
function printJSON (obj) { | ||
let seen = new Set(); | ||
try { | ||
return JSON.stringify(obj, (key, value) => { | ||
if (seen.has(value)) return '[Circular]'; | ||
if (typeof value === 'object' && value !== null) seen.add(value); | ||
// eslint-disable-next-line no-undef | ||
if (typeof BigInt !== 'undefined' && (value instanceof BigInt)) { | ||
return `[BigInt ${value.toString()}n]`; | ||
} | ||
return value; | ||
}); | ||
} catch (err) { | ||
return '[object]'; | ||
} | ||
} | ||
module.exports = { | ||
abstractFunction: abstractFunction, | ||
bufferFrom: bufferFrom, | ||
capitalize: capitalize, | ||
copyOwnProperties: copyOwnProperties, | ||
getHash: getHash, | ||
compare: compare, | ||
getOption: getOption, | ||
jsonEnd: jsonEnd, | ||
newBuffer: newBuffer, | ||
objectValues: objectValues, | ||
toMap: toMap, | ||
singleIndexOf: singleIndexOf, | ||
hasDuplicates: hasDuplicates, | ||
Lcg: Lcg, | ||
OrderedQueue: OrderedQueue, | ||
Tap: Tap | ||
abstractFunction, | ||
bufCompare, | ||
bufEqual, | ||
bufferToBinaryString, | ||
binaryStringToBuffer, | ||
capitalize, | ||
copyOwnProperties, | ||
getHash: platform.getHash, | ||
compare, | ||
getOption, | ||
impliedNamespace, | ||
isBufferLike, | ||
isValidName, | ||
jsonEnd, | ||
objectValues, | ||
qualify, | ||
toMap, | ||
singleIndexOf, | ||
hasDuplicates, | ||
unqualify, | ||
Lcg, | ||
OrderedQueue, | ||
Tap, | ||
printJSON | ||
}; |
{ | ||
"name": "avsc", | ||
"version": "6.0.0-alpha.13", | ||
"version": "6.0.0-alpha.14", | ||
"description": "Avro for JavaScript", | ||
@@ -31,5 +31,5 @@ "homepage": "https://github.com/mtth/avsc", | ||
"files": [ | ||
"etc/browser", | ||
"lib", | ||
"etc/browser", | ||
"types" | ||
"types/index.d.ts" | ||
], | ||
@@ -41,24 +41,27 @@ "main": "./lib", | ||
"./lib/files": "./etc/browser/lib/files.js", | ||
"crypto": "./etc/browser/lib/crypto.js" | ||
"./lib/platform": "./etc/browser/lib/platform.js" | ||
}, | ||
"engines": { | ||
"node": ">=0.11" | ||
"node": ">=6.0.0" | ||
}, | ||
"scripts": { | ||
"cover": "istanbul cover _mocha", | ||
"clean": "rm -rf coverage dist node_modules", | ||
"dist": "./etc/scripts/dist", | ||
"perf": "./etc/scripts/perf etc/schemas/*", | ||
"test": "mocha", | ||
"zuul": "zuul --no-coverage -- test/*.js" | ||
"check-types": "tsc --strict --noEmit types/test/*.ts", | ||
"clean": "rm -rf coverage node_modules", | ||
"cover": "nyc mocha -- --ui tdd", | ||
"lint": "eslint etc/ lib/ test/ \"etc/scripts/**\"", | ||
"perf": "node --expose-gc ./etc/scripts/perf etc/schemas/*", | ||
"test": "mocha --ui tdd --reporter dot" | ||
}, | ||
"devDependencies": { | ||
"coveralls": "^3.0.2", | ||
"istanbul": "^0.4.5", | ||
"mocha": "^5.2.0", | ||
"tmp": "^0.0.33" | ||
"@types/node": "^22.5.5", | ||
"benchmark": "~2.1.4", | ||
"eslint": "^8.30.0", | ||
"mocha": "^10.2.0", | ||
"nyc": "~15.0.0", | ||
"tmp": "^0.1.0", | ||
"typescript": "^5.6.2" | ||
}, | ||
"author": { | ||
"name": "Matthieu Monsch", | ||
"email": "monsch@alum.mit.edu" | ||
"email": "mtth@apache.org" | ||
}, | ||
@@ -65,0 +68,0 @@ "license": "MIT", |
@@ -1,2 +0,2 @@ | ||
# Avsc [![NPM version](https://img.shields.io/npm/v/avsc.svg)](https://www.npmjs.com/package/avsc) [![Download count](https://img.shields.io/npm/dm/avsc.svg)](https://www.npmjs.com/package/avsc) [![Build status](https://travis-ci.org/mtth/avsc.svg?branch=master)](https://travis-ci.org/mtth/avsc) [![Coverage status](https://coveralls.io/repos/mtth/avsc/badge.svg?branch=master&service=github)](https://coveralls.io/github/mtth/avsc?branch=master) | ||
# Avsc [![NPM version](https://img.shields.io/npm/v/avsc.svg)](https://www.npmjs.com/package/avsc) [![Download count](https://img.shields.io/npm/dm/avsc.svg)](https://www.npmjs.com/package/avsc) [![CI](https://github.com/mtth/avsc/actions/workflows/ci.yml/badge.svg)](https://github.com/mtth/avsc/actions/workflows/ci.yml) [![Coverage status](https://coveralls.io/repos/mtth/avsc/badge.svg?branch=master&service=github)](https://coveralls.io/github/mtth/avsc?branch=master) | ||
@@ -12,3 +12,3 @@ Pure JavaScript implementation of the [Avro | ||
+ All the Avro goodness and more: [type inference][type-inference], [schema | ||
evolution][schema-evolution], and [remote procedure calls][rpc]. | ||
evolution][schema-evolution]... | ||
+ Support for [serializing arbitrary JavaScript objects][logical-types]. | ||
@@ -20,11 +20,7 @@ + Unopinionated [64-bit integer compatibility][custom-long]. | ||
```bash | ||
```sh | ||
$ npm install avsc | ||
``` | ||
`avsc` is compatible with all versions of [node.js][] since `0.11` and major | ||
browsers via [browserify][]. For convenience, you can also find compiled | ||
distributions with the [releases][] (but please host your own copy). | ||
## Documentation | ||
@@ -41,4 +37,2 @@ | ||
Inside a node.js module, or using browserify: | ||
```javascript | ||
@@ -53,4 +47,8 @@ const avro = require('avsc'); | ||
type: 'record', | ||
name: 'Pet', | ||
fields: [ | ||
{name: 'kind', type: {type: 'enum', symbols: ['CAT', 'DOG']}}, | ||
{ | ||
name: 'kind', | ||
type: {type: 'enum', name: 'PetKind', symbols: ['CAT', 'DOG']} | ||
}, | ||
{name: 'name', type: 'string'} | ||
@@ -81,15 +79,7 @@ ] | ||
+ Get a [readable stream][readable-stream] of decoded values from an Avro | ||
container file compressed using [Snappy][snappy] (see the [`BlockDecoder` | ||
API][decoder-api] for an example including checksum validation): | ||
container file (see the [`BlockDecoder` API][decoder-api] for an example | ||
compressed using [Snappy][snappy]): | ||
```javascript | ||
const snappy = require('snappy'); // Or your favorite Snappy library. | ||
const codecs = { | ||
snappy: function (buf, cb) { | ||
// Avro appends checksums to compressed blocks, which we skip here. | ||
return snappy.uncompress(buf.slice(0, buf.length - 4), cb); | ||
} | ||
}; | ||
avro.createFileDecoder('./values.avro', {codecs}) | ||
avro.createFileDecoder('./values.avro') | ||
.on('metadata', function (type) { /* `type` is the writer's type. */ }) | ||
@@ -99,28 +89,5 @@ .on('data', function (val) { /* Do something with the decoded value. */ }); | ||
+ Implement a TCP server for an [IDL-defined][idl] protocol: | ||
```javascript | ||
// We first generate a protocol from its IDL specification. | ||
const protocol = avro.readProtocol(` | ||
protocol LengthService { | ||
/** Endpoint which returns the length of the input string. */ | ||
int stringLength(string str); | ||
} | ||
`); | ||
// We then create a corresponding server, implementing our endpoint. | ||
const server = avro.Service.forProtocol(protocol) | ||
.createServer() | ||
.onStringLength(function (str, cb) { cb(null, str.length); }); | ||
// Finally, we use our server to respond to incoming TCP connections! | ||
require('net').createServer() | ||
.on('connection', (con) => { server.createChannel(con); }) | ||
.listen(24950); | ||
``` | ||
[benchmarks]: https://github.com/mtth/avsc/wiki/Benchmarks | ||
[browser-support]: https://github.com/mtth/avsc/wiki#browser-support | ||
[browserify]: http://browserify.org/ | ||
[custom-long]: https://github.com/mtth/avsc/wiki/Advanced-usage#custom-long-types | ||
@@ -133,6 +100,4 @@ [decoder-api]: https://github.com/mtth/avsc/wiki/API#class-blockdecoderopts | ||
[readable-stream]: https://nodejs.org/api/stream.html#stream_class_stream_readable | ||
[releases]: https://github.com/mtth/avsc/releases | ||
[rpc]: https://github.com/mtth/avsc/wiki/Quickstart#services | ||
[schema-evolution]: https://github.com/mtth/avsc/wiki/Advanced-usage#schema-evolution | ||
[snappy]: https://avro.apache.org/docs/current/spec.html#snappy | ||
[type-inference]: https://github.com/mtth/avsc/wiki/Advanced-usage#type-inference |
@@ -9,4 +9,3 @@ // Note: These typings are incomplete (https://github.com/mtth/avsc/pull/134). | ||
import * as stream from 'stream'; | ||
import { EventEmitter } from 'events'; | ||
import * as stream from 'stream'; | ||
@@ -16,3 +15,3 @@ //"virtual" namespace (no JS, just types) for Avro Schema | ||
export type AvroSchema = DefinedType | DefinedType[]; | ||
type DefinedType = PrimitiveType | ComplexType | LogicalType | string; | ||
type DefinedType = PrimitiveType | ComplexType | LogicalType | Type | string; | ||
type PrimitiveType = 'null' | 'boolean' | 'int' | 'long' | 'float' | 'double' | 'bytes' | 'string'; | ||
@@ -27,3 +26,3 @@ type ComplexType = NamedType | RecordType | EnumType | MapType | ArrayType | FixedType; | ||
interface RecordType { | ||
type: "record"; | ||
type: "record" | "error"; | ||
name: string; | ||
@@ -38,4 +37,4 @@ namespace?: string; | ||
default?: any; | ||
order?: "ascending" | "descending" | "ignore"; | ||
}[]; | ||
order?: "ascending" | "descending" | "ignore"; | ||
} | ||
@@ -50,2 +49,3 @@ | ||
symbols: string[]; | ||
default?: string; | ||
} | ||
@@ -77,5 +77,5 @@ | ||
type Schema = Type | schema.AvroSchema; | ||
type Schema = schema.AvroSchema; | ||
type Callback<V, Err = any> = (err: Err, value: V) => void; | ||
type Callback<V, Err = any> = (err: Err | null, value?: V) => void; | ||
@@ -104,2 +104,17 @@ | ||
/** | ||
* A projection function that is used when unwrapping unions. | ||
* This function is called at schema parsing time on each union with its branches' | ||
* types. | ||
* If it returns a non-null (function) value, that function will be called each | ||
* time a value's branch needs to be inferred and should return the branch's | ||
* index. | ||
* The index muss be a number between 0 and length-1 of the passed types. | ||
* In this case (a branch index) the union will use an unwrapped representation. | ||
* Otherwise (undefined), the union will be wrapped. | ||
*/ | ||
type BranchProjection = (types: ReadonlyArray<Type>) => | ||
| ((val: unknown) => number) | ||
| undefined; | ||
interface ForSchemaOptions { | ||
@@ -110,5 +125,6 @@ assertLogicalTypes: boolean; | ||
noAnonymousTypes: boolean; | ||
omitRecordMethods: boolean; | ||
registry: { [name: string]: Type }; | ||
typeHook: (schema: Schema, opts: ForSchemaOptions) => Type; | ||
wrapUnions: boolean | 'auto' | 'always' | 'never'; | ||
typeHook: (schema: Schema | string, opts: ForSchemaOptions) => Type | undefined; | ||
wrapUnions: BranchProjection | boolean | 'auto' | 'always' | 'never'; | ||
} | ||
@@ -136,4 +152,14 @@ | ||
} | ||
interface ImportHookPayload { | ||
path: string; | ||
type: 'idl' | 'protocol' | 'schema'; | ||
} | ||
type ImportHookCallback = (err: any, params?: {contents: string, path: string}) => void; | ||
type ImportHook = (payload: ImportHookPayload, cb: ImportHookCallback) => void; | ||
interface AssembleOptions { | ||
importHook: (filePath: string, type: 'idl', callback: Callback<object>) => void; | ||
importHook: (params: ImportHookPayload, callback: Callback<object>) => void; | ||
} | ||
@@ -158,5 +184,3 @@ | ||
export function createBlobDecoder(blob: Blob, opts?: Partial<DecoderOptions>): streams.BlockDecoder; | ||
export function discoverProtocol(transport: Service.Transport, options: any, callback: Callback<any>): void; | ||
export function discoverProtocol(transport: Service.Transport, callback: Callback<any>): void; | ||
export function extractFileHeader(filePath: string, options?: any): void; | ||
export function extractFileHeader(filePath: string, options?: any): any; | ||
export function parse(schemaOrProtocolIdl: string, options?: any): any; // TODO protocol literal or Type | ||
@@ -198,91 +222,3 @@ export function readProtocol(protocolIdl: string, options?: Partial<DecoderOptions>): any; | ||
export class Service { | ||
constructor(name: any, messages: any, types: any, ptcl: any, server: any); | ||
createClient(options?: Partial<Service.ClientOptions>): Service.Client; | ||
createServer(options?: Partial<Service.ServerOptions>): Service.Server; | ||
equals(args: any): boolean; // deprecated | ||
inspect(): string; | ||
message(name: string): any; | ||
type(name: string): Type | undefined; | ||
readonly doc: string | undefined; | ||
readonly hash: Buffer; | ||
readonly messages: any[]; | ||
readonly name: string; | ||
readonly protocol: any; | ||
readonly types: Type[]; | ||
static compatible(client: Service.Client, server: Service.Server): boolean; | ||
static forProtocol(protocol: any, options?: any): Service; | ||
static isService(obj: any): boolean; | ||
} | ||
export namespace Service { | ||
interface ClientChannel extends EventEmitter { | ||
readonly client: Client; | ||
readonly destroyed: boolean; | ||
readonly draining: boolean; | ||
readonly pending: number; | ||
readonly timeout: number; | ||
ping(timeout?: number, cb?: any): void; | ||
destroy(noWait?: boolean): void; | ||
} | ||
interface ServerChannel extends EventEmitter { | ||
readonly destroyed: boolean; | ||
readonly draining: boolean; | ||
readonly pending: number; | ||
readonly server: Server; | ||
destroy(noWait?: boolean): void; | ||
} | ||
interface ClientOptions { | ||
buffering: boolean; | ||
channelPolicy: any; | ||
strictTypes: boolean; | ||
timeout: number; | ||
remoteProtocols: boolean; | ||
transport?: Transport; | ||
} | ||
interface ServerOptions { | ||
objectMode: boolean; | ||
} | ||
type TransportFunction = () => void; // TODO | ||
type Transport = stream.Duplex | TransportFunction; | ||
interface ChannelCreateOptions { | ||
objectMode: boolean; | ||
} | ||
interface ChannelDestroyOptions { | ||
noWait: boolean; | ||
} | ||
class Server extends EventEmitter { | ||
constructor(svc: any, opts: any); | ||
readonly service: Service; | ||
// on<message>() | ||
activeChannels(): ServerChannel[]; | ||
createChannel(transport: Transport, options?: Partial<ChannelCreateOptions>): ServerChannel; | ||
onMessage<T>(name: string, handler: (arg1: any, callback: Callback<T>) => void): this; | ||
remoteProtocols(): any[]; | ||
use(...args: any[]): this; | ||
} | ||
class Client extends EventEmitter { | ||
constructor(svc: any, opts: any); | ||
activeChannels(): ClientChannel[]; | ||
createChannel(transport: Transport, options?: Partial<ChannelCreateOptions>): ClientChannel; | ||
destroyChannels(options?: Partial<ChannelDestroyOptions>): void; | ||
emitMessage<T>(name: string, req: any, options?: any, callback?: Callback<T>): void // TODO | ||
remoteProtocols(): any[]; | ||
use(...args: any[]): this; | ||
} | ||
} | ||
export namespace streams { | ||
@@ -370,3 +306,3 @@ | ||
random(): LongType; | ||
static __with(methods: object, noUnpack?: boolean): void; | ||
static __with(methods: object, noUnpack?: boolean): LongType; | ||
} | ||
@@ -373,0 +309,0 @@ |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Uses eval
Supply chain riskPackage uses dynamic code execution (e.g., eval()), which is a dangerous practice. This can prevent the code from running in certain environments and increases the risk that the code may contain exploits or malicious behavior.
Found 1 instance in 1 package
8
190809
7
6080
97