cypher-stream
Advanced tools
Comparing version 0.2.0 to 0.2.1
@@ -5,2 +5,3 @@ var oboe = require('oboe'); | ||
var urlParser = require('url'); | ||
var normalize = require('./normalize-query-statement'); | ||
@@ -31,5 +32,14 @@ util.inherits(CypherStream, Readable); | ||
Readable.call(this, { objectMode: true }); | ||
statements = normalize(statements).filter(function (statement) { | ||
if(statement.commit) { | ||
options.commit = true; | ||
} | ||
if(statement.rollback) { | ||
options.rollback = true; | ||
} | ||
return statement.statement; | ||
}); | ||
// if a rollback is requested before a transactionId is acquired, we can just stop here. | ||
if(!options.transactionId && options.rollback) { | ||
// if a rollback is requested before a transactionId is acquired, we can quit early. | ||
if(options.rollback && !options.transactionId) { | ||
this.push(null); | ||
@@ -39,22 +49,5 @@ return this; | ||
// Normalize various statement syntaxes to [ { statement: statement, parameters: parameters }] | ||
// { statement: "statement" } | ||
if (!(statements instanceof Array) && typeof statements === 'object') { | ||
statements = [statements]; | ||
} | ||
// "statement" | ||
if (typeof statements === 'string') { | ||
statements = [ { statement: statements } ]; | ||
} | ||
// ["statement"] | ||
if (statements instanceof Array && typeof statements[0] === 'string') { | ||
statements = statements.map(function (statement) { | ||
return { statement: statement }; | ||
}); | ||
} | ||
if (!(statements instanceof Array) && !options.commit && !options.rollback) { | ||
throw new Error('CypherStream: No statement or commit/rollback request received.'); | ||
} | ||
var columns; | ||
var transactionTimeout; | ||
var self = this; | ||
@@ -66,3 +59,2 @@ var headers = { | ||
var transactionTimeout; | ||
@@ -89,6 +81,6 @@ var parsedUrl = urlParser.parse(databaseUrl); | ||
function transactionExpired () { | ||
self.emit('expired'); | ||
self.push(null); | ||
self.emit('transactionExpired'); | ||
} | ||
// console.log("%s %s", options.transactionId && options.rollback ? 'DELETE': 'POST', url, statements); | ||
@@ -99,6 +91,10 @@ var stream = oboe({ | ||
headers : headers, | ||
body : statements ? { statements: statements } : null, | ||
body : { statements: statements }, | ||
}); | ||
stream.on('start', function (status, headers) { | ||
stream.node('!.*', function CypherStreamAll(){ | ||
return oboe.drop; // discard records as soon as they're processed to conserve memory | ||
}); | ||
stream.on('start', function CypherStreamStart(status, headers) { | ||
if (headers.location) { | ||
@@ -109,6 +105,7 @@ self.emit('transactionId', headers.location.split('/').pop()); | ||
stream.node('!transaction.expires', function (result, path, ancestors) { | ||
stream.node('!transaction.expires', function CypherStreamTransactionExpires(date, path, ancestors) { | ||
clearTimeout(transactionTimeout); | ||
var timeTillExpire = Date.parse('Sun, 19 Oct 2014 05:06:47')-Date.now(); | ||
transactionTimeout = setTimeout(transactionExpired, timeTillExpire); | ||
var timeTillExpired = Date.parse(date)-Date.now(); | ||
transactionTimeout = setTimeout(transactionExpired, timeTillExpired); | ||
self.emit('expires', date); | ||
}); | ||
@@ -129,3 +126,3 @@ | ||
stream.done(function CypherStreamDone(complete) { | ||
stream.on('done', function CypherStreamDone(complete) { | ||
clearTimeout(transactionTimeout); | ||
@@ -138,3 +135,3 @@ if (options && options.commit || options.rollback) { | ||
stream.node('!errors[*]', function (error, path, ancestors) { | ||
stream.node('!errors[*]', function CypherStreamHandleError(error, path, ancestors) { | ||
var message = "Query Failure"; | ||
@@ -149,3 +146,3 @@ if (error.message) { | ||
stream.fail(function CypherStreamHandleError(error) { | ||
stream.on('fail', function CypherStreamHandleFailure(error) { | ||
// handle non-neo4j errors | ||
@@ -152,0 +149,0 @@ if (!error.jsonBody) { |
{ | ||
"name": "cypher-stream", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "Streams cypher query results in a clean format", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -51,2 +51,3 @@ # cypher-stream [![Build Status](https://travis-ci.org/brian-gates/cypher-stream.png?branch=master)](https://travis-ci.org/brian-gates/cypher-stream) [![NPM version](https://badge.fury.io/js/cypher-stream.png)](http://badge.fury.io/js/cypher-stream) [![devDependency Status](https://david-dm.org/brian-gates/cypher-stream.png?theme=shields.io)](https://david-dm.org/brian-gates/cypher-stream.png#info=devDependencies) | ||
Transactions are duplex streams that allow you to write query statements then commit or roll back the written queries. | ||
@@ -109,8 +110,7 @@ | ||
### Syncronous Batching | ||
### Query Batching | ||
Transactions automatically batch syncronous writes garnering significant performance gains. | ||
Transactions automatically batch queries for significant performance gains. Try the following: | ||
``` js | ||
var results = 0; | ||
var queriesToRun = 10000; | ||
@@ -120,10 +120,4 @@ var queriesWritten = 0; | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
console.log(result); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(queriesToRun); | ||
done(); | ||
}) | ||
; | ||
@@ -130,0 +124,0 @@ while (queriesWritten++ < queriesToRun) { |
@@ -63,2 +63,23 @@ var should = require('should'); | ||
it('handles accepts a variety of statement formats', function (done) { | ||
var results = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(6); | ||
done(); | ||
}) | ||
; | ||
var query = 'match (n:Test) return n limit 1'; | ||
transaction.write(query); | ||
transaction.write({ statement: query }); | ||
transaction.write([ query, query ]); | ||
transaction.write([ { statement: query }, { statement: query } ]); | ||
transaction.commit(); | ||
}); | ||
it('handles write and commit', function (done) { | ||
@@ -176,49 +197,71 @@ var results = 0; | ||
// TODO fix this | ||
it.skip('handles transaction expiration', function (done) { | ||
var results = 0; | ||
it('emits expiration', function (done) { | ||
var called = false; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
.on('error', shouldNotError) | ||
.on('expires', function (date) { | ||
called = true; | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(1); | ||
.on('end', function () { | ||
called.should.equal(true); | ||
done(); | ||
}) | ||
; | ||
timekeeper.travel(new Date(Number(new Date())+100000000000)); | ||
transaction.resume(); | ||
transaction.write('match (n:Test) return n limit 1'); | ||
transaction.commit(); | ||
}); | ||
it.skip('batches async requests with specified debounceTime', function (done) { | ||
var results = 0; | ||
var queriesToRun = 1000; | ||
var queriesWritten = 0; | ||
var transaction = cypher.transaction() | ||
// var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(queriesToRun); | ||
done(); | ||
}) | ||
; | ||
var interval = setInterval(function () { | ||
if (queriesWritten >= queriesToRun) { | ||
clearTimeout(interval); | ||
transaction.commit(); | ||
return; | ||
} | ||
setTimeout(function() { | ||
transaction.write('match (n:Test) return n limit 1'); | ||
queriesWritten++; | ||
}, 1); | ||
transaction.commit(); | ||
}, 0); | ||
}); | ||
it.skip('handles expiration', function (done) { | ||
var serverTimeout = 60; // set this equal to neo4j-server.properties -> org.neo4j.server.transaction.timeout (default 60) | ||
var errorCalled = false; | ||
var expiresCalled = false; | ||
var expiredCalled = false; | ||
var transaction = cypher.transaction(); | ||
this.timeout((serverTimeout+10)*1000); | ||
transaction.on('expires', function (date) { | ||
expiresCalled = true; | ||
}); | ||
transaction.on('expired', function () { | ||
expiredCalled = true; | ||
}); | ||
transaction.on('error', function (error) { | ||
errorCalled = true; | ||
error.neo4j.should.eql({ | ||
statusCode: 400, | ||
errors: [{ | ||
code : 'Neo.ClientError.Transaction.UnknownId', | ||
message : 'Unrecognized transaction id. Transaction may have timed out and been rolled back.' | ||
}], | ||
results: [] | ||
}); | ||
}); | ||
transaction.on('end', function () { | ||
errorCalled .should.equal(true); | ||
expiresCalled.should.equal(true); | ||
expiredCalled.should.equal(true); | ||
done(); | ||
}); | ||
transaction.resume(); | ||
transaction.write('match (n:Test) return n limit 1'); | ||
setTimeout(function() { | ||
transaction.write('match (n:Test) return n limit 1'); | ||
transaction.commit(); | ||
}, ((serverTimeout+5)*1000)); | ||
}); | ||
}); |
@@ -1,4 +0,5 @@ | ||
var Duplex = require('stream').Duplex; | ||
var util = require('util'); | ||
var CypherStream = require('./CypherStream'); | ||
var Duplex = require('stream').Duplex; | ||
var util = require('util'); | ||
var CypherStream = require('./CypherStream'); | ||
var normalize = require('./normalize-query-statement'); | ||
@@ -18,11 +19,11 @@ util.inherits(TransactionStream, Duplex); | ||
this.commit = function () { | ||
return this.write({ commit: true }); | ||
return self.write({ commit: true }); | ||
}; | ||
this.rollback = function () { | ||
return this.write({ rollback: true }); | ||
return self.write({ rollback: true }); | ||
}; | ||
function processChunk(input, encoding, callback) { | ||
var statements = normalizeStatementInput(input); | ||
var statements = normalize(input); | ||
var callbacks = [callback]; | ||
@@ -43,3 +44,3 @@ var options = {}; | ||
var buffered = buffer.shift(); | ||
var bufferedStatements = normalizeStatementInput(buffered.chunk); | ||
var bufferedStatements = normalize(buffered.chunk); | ||
if (bufferedStatements) { | ||
@@ -56,2 +57,3 @@ statements = statements.concat(bufferedStatements); | ||
} | ||
var stream = new CypherStream(url, statements, options); | ||
@@ -65,12 +67,18 @@ | ||
// stream.on('transactionComplete', function () { | ||
// self.push(null); | ||
// }); | ||
stream.on('expires', function (date) { | ||
self.emit('expires', date); | ||
}); | ||
stream.on('transactionExpired', function () { | ||
self.emit('expired'); | ||
}); | ||
stream.on('data', function (data) { | ||
self.push(data); | ||
}); | ||
stream.on('error', function (errors) { | ||
self.emit('error', errors); | ||
}); | ||
stream.on('end', function () { | ||
@@ -84,2 +92,3 @@ callbacks.forEach(function (callback) { | ||
}); | ||
} | ||
@@ -96,25 +105,2 @@ | ||
function normalizeStatementInput(input) { | ||
// "statement" | ||
if (typeof input === 'string') { | ||
return [{ statement: input }]; | ||
} | ||
// ["statement"] | ||
if (input instanceof Array && typeof input[0] === 'string') { | ||
return input.map(normalizeStatementInput); | ||
} | ||
// [{ statement: "statement" }] | ||
if (input instanceof Array && typeof input[0] === 'object') { | ||
return input; | ||
} | ||
// { statment: "statement" } | ||
if (input.statement) { | ||
return [{ | ||
statement : input.statement, | ||
parameters : input.parameters, | ||
}]; | ||
} | ||
} | ||
module.exports = TransactionStream; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
38552
12
676
0
126