cypher-stream
Advanced tools
Comparing version 0.2.2 to 0.3.0
{ | ||
"name": "cypher-stream", | ||
"version": "0.2.2", | ||
"version": "0.3.0", | ||
"description": "Streams cypher query results in a clean format", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -154,2 +154,3 @@ 'use strict'; | ||
var results = 0; | ||
var timeout = process.env.TRAVIS_CI ? 1000 : 50; | ||
var transaction = cypher.transaction() | ||
@@ -175,3 +176,3 @@ .on('data', function () { | ||
transaction.rollback(); | ||
}, 50); | ||
}, timeout); | ||
}); | ||
@@ -302,3 +303,3 @@ | ||
var results = 0; | ||
var transaction = cypher.transaction({metadata: true}) | ||
var transaction = cypher.transaction({ metadata: true }) | ||
.on('data', function (result) { | ||
@@ -305,0 +306,0 @@ results++; |
@@ -19,3 +19,5 @@ 'use strict'; | ||
var self = this; | ||
var buffer = []; | ||
var transactionId; | ||
var debounce; | ||
var debounceTime = options && options.debounceTime || 0; | ||
@@ -25,43 +27,33 @@ var batchSize = options && options.batchSize || 10000; | ||
this.commit = function () { | ||
this.commit = function commitAlias() { | ||
return self.write({ commit: true }); | ||
}; | ||
this.rollback = function () { | ||
this.rollback = function rollbackAlias() { | ||
return self.write({ rollback: true }); | ||
}; | ||
function processChunk(input, encoding, callback) { | ||
var statements = normalize(input); | ||
var callbacks = [callback]; | ||
var options = {metadata: metadata}; | ||
if (input.commit) { | ||
options.commit = true; | ||
function handle() { | ||
var statements = []; | ||
var callbacks = []; | ||
var options = {}; | ||
if(metadata) { | ||
options.metadata = metadata; | ||
} | ||
if (input.rollback) { | ||
options.rollback = true; | ||
} | ||
if (transactionId) { | ||
options.transactionId = transactionId; | ||
} | ||
// combine any buffered queries | ||
var buffer = self._writableState.buffer; | ||
while (buffer.length && statements.length < batchSize && !options.rollback) { | ||
var buffered = buffer.shift(); | ||
var bufferedStatements = normalize(buffered.chunk); | ||
if (bufferedStatements) { | ||
statements = statements.concat(bufferedStatements); | ||
} | ||
if (buffered.chunk.commit) { | ||
options.commit = true; | ||
} | ||
if (buffered.chunk.rollback) { | ||
options.rollback = true; | ||
} | ||
callbacks.push(buffered.callback); | ||
while(buffer.length) { | ||
var input = buffer.shift(); | ||
statements = statements.concat(normalize(input)); | ||
if (input.commit) { options.commit = true; } | ||
if (input.rollback) { options.rollback = true; } | ||
} | ||
// console.log('new CypherStream', url, statements, options); | ||
var stream = new CypherStream(url, statements, options); | ||
stream.on('transactionId', function (txId) { | ||
stream.on('transactionId', function transactionIdHandler(txId) { | ||
if (!transactionId) { | ||
@@ -72,22 +64,19 @@ transactionId = txId; | ||
stream.on('expires', function (date) { | ||
stream.on('expires', function expiresHandler(date) { | ||
self.emit('expires', date); | ||
}); | ||
stream.on('transactionExpired', function () { | ||
stream.on('transactionExpired', function transactionExpiredHandler() { | ||
self.emit('expired'); | ||
}); | ||
stream.on('data', function (data) { | ||
stream.on('data', function dataHandler(data) { | ||
self.push(data); | ||
}); | ||
stream.on('error', function (errors) { | ||
stream.on('error', function errorHandler(errors) { | ||
self.emit('error', errors); | ||
}); | ||
stream.on('end', function () { | ||
callbacks.forEach(function (callback) { | ||
callback(); | ||
}); | ||
stream.on('end', function endHandler() { | ||
if(options.rollback || options.commit) { | ||
@@ -100,6 +89,12 @@ self.push(null); | ||
this._write = function (input, encoding, done) { | ||
setTimeout(function () { | ||
processChunk(input, encoding, done); | ||
}, debounceTime); | ||
this._write = function (chunk, encoding, callback) { | ||
buffer.push(chunk); | ||
if(debounce) { clearTimeout(debounce); } | ||
// debounce to allow writes to buffer | ||
if(buffer.length === batchSize) { | ||
handle(); | ||
} else { | ||
debounce = setTimeout(handle, debounceTime); | ||
} | ||
callback(); | ||
}; | ||
@@ -106,0 +101,0 @@ this._read = function () { }; |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
261773
45
2305
7
4