cypher-stream
Advanced tools
Comparing version 0.3.0 to 1.0.0-alpha
'use strict'; | ||
var oboe = require('oboe'); | ||
var Readable = require('stream').Readable; | ||
var PassThrough = require('stream').PassThrough; | ||
var util = require('util'); | ||
var urlParser = require('url'); | ||
var normalize = require('./normalize-query-statement'); | ||
var moduleInfo = require('./package.json'); | ||
var $ = require('highland'); | ||
var neo4j = require('neo4j-driver').v1; | ||
var normalize = require('./util/normalize-query-statement'); | ||
var observableToStream = require('./util/observable-to-stream'); | ||
var R = require('ramda'); | ||
var Readable = require('stream').Readable; | ||
var toNative = require('./util/to-native'); | ||
util.inherits(CypherStream, Readable); | ||
var compose = R.compose; | ||
var cond = R.cond; | ||
var curry = R.curry; | ||
var has = R.has; | ||
var map = R.map; | ||
var prop = R.prop; | ||
// Options: | ||
// - transactionId: string ID for the current transaction, if there is one. | ||
// - commit: true if this query should be committed, whether it's in a | ||
// transaction or not. | ||
// - rollback: true if this transaction should be rolled back. Implies that | ||
// `commit` is *not* true, and that a `transactionId` is set. | ||
// - metadata: true if node & relationship metadata should be returned too, | ||
// not just property data. (This translates to Neo4j's REST format.) | ||
// - headers: dictionary of headers to include in this request. | ||
function CypherStream(databaseUrl, statements, options) { | ||
Readable.call(this, { objectMode: true }); | ||
statements = normalize(statements).filter(function (statement) { | ||
// Support passing in options within statement object: | ||
if(statement.commit) { | ||
options.commit = true; | ||
delete statement.commit; | ||
} | ||
if(statement.rollback) { | ||
options.rollback = true; | ||
delete statement.rollback; | ||
} | ||
if(statement.metadata) { | ||
options.metadata = true; | ||
delete statement.metadata; | ||
} | ||
if(statement.headers) { | ||
options.headers = statement.headers; | ||
delete statement.headers; | ||
} | ||
// But only count this statement object if it actually has a statement: | ||
return !!statement.statement; | ||
}); | ||
// var tap = R.tap; | ||
// var log = tap(console.log.bind(console)); | ||
// if a rollback is requested before a transactionId is acquired, we can quit early. | ||
if(options.rollback && !options.transactionId) { | ||
this.push(null); | ||
return this; | ||
} | ||
// Recursively map Neo4j values | ||
// to their native equivalants | ||
// if metadata is requested, we need to specify that on each statement: | ||
if (options.metadata) { | ||
statements.forEach(function (statement) { | ||
statement.resultDataContents = ['REST']; | ||
}); | ||
} | ||
// session => statement => observable | ||
var run = curry((runner, statement) => | ||
runner.run(statement.statement, statement.parameters) | ||
); | ||
var columns; | ||
var transactionTimeout; | ||
var self = this; | ||
var headers = { | ||
'X-Stream': true, | ||
'Accept': 'application/json', | ||
'User-Agent': 'cypher-stream/' + moduleInfo.version | ||
}; | ||
var currentStatement = 0; | ||
var callbackStream = null; | ||
var runStream = curry(compose(observableToStream, run)); | ||
var parsedUrl = urlParser.parse(databaseUrl); | ||
var emitError = R.curry((stream, error) => | ||
stream.emit('error', error) | ||
); | ||
//add HTTP basic auth if needed | ||
if (parsedUrl.auth) { | ||
headers.Authorization = 'Basic ' + new Buffer(parsedUrl.auth).toString('base64'); | ||
} | ||
var handleNeo4jError = emit => compose( | ||
map(compose( | ||
emit, | ||
error => new neo4j.Neo4jError(error.message, error.code) | ||
)), | ||
prop('fields') | ||
); | ||
//add any custom HTTP headers | ||
for (var key in options.headers || {}) { | ||
headers[key] = options.headers[key]; | ||
} | ||
var handleError = emit => cond([ | ||
[isNeo4jError, handleNeo4jError(emit)], | ||
[R.T, emit] | ||
]); | ||
if (databaseUrl[databaseUrl.length - 1] !== '/') { | ||
databaseUrl += '/'; // ensure trailing slash | ||
var isNeo4jError = R.has('fields'); | ||
class CypherStream extends Readable { | ||
constructor(runner, statements) { | ||
super({ objectMode: true }); | ||
this.statements = statements; | ||
this.runner = runner; | ||
this.start(); | ||
} | ||
var url = databaseUrl+'db/data/transaction'; | ||
if (options && options.transactionId) { | ||
url += '/'+options.transactionId; | ||
} | ||
if (options && options.commit) { | ||
url += '/commit'; | ||
} | ||
function transactionExpired () { | ||
self.emit('transactionExpired'); | ||
start() { | ||
$([this.statements]) | ||
.flatMap(normalize) | ||
.filter(has('statement')) | ||
.flatMap(statement => { | ||
var stream = | ||
runStream(this.runner, statement) | ||
.map(toNative); | ||
if(statement.callback) { | ||
statement.callback(stream.observe()); | ||
} | ||
return stream; | ||
}) | ||
.errors(handleError(emitError(this))) | ||
.doto(x => this.push(x)) | ||
.on('end', () => this.push(null)) | ||
.resume() | ||
; | ||
} | ||
// console.log('%s %s', options.transactionId && options.rollback ? 'DELETE': 'POST', url, JSON.stringify(statements)); | ||
var stream = oboe({ | ||
url : url, | ||
method : options.transactionId && options.rollback ? 'DELETE': 'POST', | ||
headers : headers, | ||
body : { statements: statements }, | ||
}); | ||
stream.node('!.*', function CypherStreamAll(){ | ||
return oboe.drop; // discard records as soon as they're processed to conserve memory | ||
}); | ||
stream.on('start', function CypherStreamStart(status, headers) { | ||
if (headers.location) { | ||
self.emit('transactionId', headers.location.split('/').pop()); | ||
} | ||
}); | ||
stream.node('!transaction.expires', function CypherStreamTransactionExpires(date) { | ||
clearTimeout(transactionTimeout); | ||
var timeTillExpired = Date.parse(date)-Date.now(); | ||
transactionTimeout = setTimeout(transactionExpired, timeTillExpired); | ||
self.emit('expires', date); | ||
}); | ||
stream.path('!results[*]', function CypherStreamResult() { | ||
if (callbackStream) { | ||
self.unpipe(callbackStream); | ||
callbackStream.end(); | ||
callbackStream = null; | ||
} | ||
if (statements[currentStatement].callback) { | ||
callbackStream = new PassThrough({ objectMode: true }); | ||
statements[currentStatement].callback(callbackStream); | ||
} | ||
currentStatement++; | ||
}); | ||
stream.node('!results[*].columns', function CypherStreamNodeColumns(c) { | ||
self.emit('columns', c); | ||
columns = c; | ||
}); | ||
var dataSelector = '!results[*].data[*].' + (options.metadata ? 'rest' : 'row'); | ||
stream.node(dataSelector, function CypherStreamNodeData(result) { | ||
var data = {}; | ||
columns.forEach(function (column, i) { | ||
data[column] = result[i]; | ||
}); | ||
if (callbackStream) { | ||
callbackStream.write(data); | ||
} | ||
self.push(data); | ||
}); | ||
stream.on('done', function CypherStreamDone() { | ||
clearTimeout(transactionTimeout); | ||
if (options && options.commit || options.rollback) { | ||
self.emit('transactionComplete'); | ||
} | ||
if (callbackStream) { | ||
callbackStream.end(); | ||
} | ||
self.push(null); | ||
}); | ||
stream.node('!errors[*]', function CypherStreamHandleError(error) { | ||
var message = 'Query Failure'; | ||
if (error.message) { | ||
message += ': ' + error.message; | ||
} | ||
var err = new Error(message); | ||
err.code = error.code; | ||
self.emit('error', err); | ||
}); | ||
stream.on('fail', function CypherStreamHandleFailure(error) { | ||
var err; | ||
// handle non-neo4j errors | ||
if (!error.jsonBody) { | ||
// pass the Error instance through, creating one if necessary | ||
err = error.thrown || new Error('Neo4j ' + error.statusCode); | ||
err.statusCode = error.statusCode; | ||
err.body = error.body; | ||
err.jsonBody = error.jsonBody; | ||
self.emit('error', err); | ||
self.push(null); | ||
return; | ||
} | ||
// handle neo4j errors | ||
var message = 'Query failure'; | ||
var statusCode = 400; | ||
if (error.jsonBody.message) { | ||
message += ': '+error.jsonBody.message; | ||
} | ||
if (error.jsonBody.statusCode) { | ||
statusCode = error.jsonBody.statusCode; | ||
} | ||
err = new Error(message); | ||
err.neo4j = error.jsonBody; | ||
err.neo4j.statusCode = statusCode; | ||
self.emit('error', err); | ||
self.push(null); | ||
}); | ||
this._read = function () { }; | ||
_read() {} | ||
} | ||
module.exports = CypherStream; |
44
index.js
@@ -0,16 +1,44 @@ | ||
'use strict'; | ||
var CypherStream = require('./CypherStream'); | ||
var neo4j = require('neo4j-driver').v1; | ||
var R = require('ramda'); | ||
var TransactionStream = require('./TransactionStream'); | ||
module.exports = function Connection(url) { | ||
var factory = function CypherStreamFactory(query, params) { | ||
var statements = query; | ||
if (params) { | ||
statements = [ { statement: query, parameters: params } ]; | ||
var all = R.all; | ||
var compose = R.compose; | ||
var cond = R.cond; | ||
var isNil = R.isNil; | ||
var not = R.not; | ||
var unapply = R.unapply; | ||
var always = R.always; | ||
var notNil = compose(not, isNil); | ||
// (user, pass) => auth || undefined | ||
var auth = cond([ | ||
[unapply(all(notNil)), neo4j.auth.basic ], | ||
[R.T, always(undefined)], | ||
]); | ||
module.exports = function Connection(url, options) { | ||
options = options || {}; | ||
var driver = neo4j.driver(url || 'bolt://localhost', auth(options.username, options.password)); | ||
var factory = function CypherStreamFactory(statement, parameters) { | ||
if (parameters) { | ||
statement = [ { statement, parameters } ]; | ||
} | ||
return new CypherStream(url, statements, { commit: true }); | ||
var session = driver.session(); | ||
return new CypherStream(session, statement) | ||
.on('end', () => session.close()); | ||
}; | ||
factory.transaction = function (options) { | ||
return new TransactionStream(url, options); | ||
factory.transaction = options => { | ||
var session = driver.session(); | ||
return new TransactionStream(session, options) | ||
.on('end', () => session.close()); | ||
}; | ||
return factory; | ||
}; |
{ | ||
"name": "cypher-stream", | ||
"version": "0.3.0", | ||
"version": "1.0.0-alpha", | ||
"description": "Streams cypher query results in a clean format", | ||
@@ -25,3 +25,6 @@ "main": "index.js", | ||
"dependencies": { | ||
"oboe": "^2.1.1" | ||
"highland": "^2.9.0", | ||
"neo4j-driver": "^1.0.3", | ||
"oboe": "^2.1.1", | ||
"ramda": "^0.21.0" | ||
}, | ||
@@ -28,0 +31,0 @@ "devDependencies": { |
'use strict'; | ||
var should = require('should'); | ||
var cypher = require('../index')('http://localhost:7474'); | ||
var http = require('http'); | ||
var cypher = require('../index')('bolt://0.0.0.0'); | ||
var should = require('should'); | ||
var R = require('ramda'); | ||
function shouldNotError(error) { | ||
should.not.exist(error); | ||
} | ||
var shouldNotError = error => should.not.exist(error); | ||
describe('Cypher stream', function () { | ||
describe('Cypher stream', () => { | ||
var testRecordsToCreate = 10; | ||
before(function (done){ | ||
before(function(done) { | ||
// Travis CI is slow. Give him more time. | ||
@@ -18,148 +17,122 @@ if (process.env.TRAVIS_CI) { | ||
cypher('FOREACH (x IN range(1,'+testRecordsToCreate+') | CREATE(:Test {test: true}))') | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
}); | ||
after(function (done){ | ||
after(done => { | ||
cypher('MATCH (n:Test) DELETE n') | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
}); | ||
it('works', function (done) { | ||
it('works', done => { | ||
var results = 0; | ||
cypher('match (n:Test) return n limit 10') | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(10); | ||
done(); | ||
}) | ||
.on('data', result => { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', () => { | ||
results.should.eql(10); | ||
done(); | ||
}) | ||
; | ||
}); | ||
it('handles errors', function (done) { | ||
it('handles errors', done => { | ||
var errored = false; | ||
cypher('invalid query') | ||
.on('error', function (error) { | ||
errored = true; | ||
String(error).should.equal('Error: Query Failure: Invalid input \'i\': expected <init> (line 1, column 1)\n"invalid query"\n ^'); | ||
}) | ||
.on('end', function () { | ||
errored.should.be.true; | ||
done(); | ||
}) | ||
.resume() // need to manually start it since we have no on('data') | ||
.on('error', error => { | ||
errored = true; | ||
should.equal( | ||
error.code, | ||
'Neo.ClientError.Statement.SyntaxError' | ||
); | ||
should.equal( | ||
error.message, | ||
'Invalid input \'i\': expected <init> (line 1, column 1 (offset: 0))\n"invalid query"\n ^' | ||
); | ||
}) | ||
.on('end', () => { | ||
should.equal(true, errored); | ||
done(); | ||
}) | ||
.resume() // need to manually start it since we have no on('data') | ||
; | ||
}); | ||
it('handles non-neo4j errors', function (done) { | ||
var errored = false; | ||
var expectedError = new Error('Test'); | ||
cypher('match (n:Test) return n limit 1') | ||
.on('data', function () { | ||
throw expectedError; | ||
}) | ||
.on('error', function (error) { | ||
errored = true; | ||
error.should.equal(expectedError); | ||
}) | ||
.on('end', function () { | ||
errored.should.be.true; | ||
done(); | ||
}) | ||
; | ||
}); | ||
it('returns non-object values', function (done) { | ||
it('returns non-object values', done => { | ||
cypher('match (n:Test) return n.test as test limit 1') | ||
.on('data', function (result) { | ||
result.should.eql({ test: true }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
.on('data', result => result.should.eql({ test: true })) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
; | ||
}); | ||
it('returns collections', function (done) { | ||
it('returns collections', done => { | ||
cypher('match (n:Test) return collect(n) as nodes limit 1') | ||
.on('data', function (result) { | ||
// 10x { test: true } | ||
result.should.eql({ nodes: [{ test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true } ] }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
.on('data', result => { | ||
result.should.eql({ | ||
nodes: R.times(R.always({ test: true }), 10) | ||
}); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
; | ||
}); | ||
it('returns non-node collections', function (done) { | ||
it('returns non-node collections', done => { | ||
cypher('match (n:Test) return labels(n) as labels limit 1') | ||
.on('data', function (result) { | ||
result.should.eql({ labels: ['Test']}); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
.on('data', result => result.should.eql({ labels: ['Test']})) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
; | ||
}); | ||
it('recursively returns data values', function (done) { | ||
cypher('match (n:Test) return { child: { grandchild: n }} as parent limit 1') | ||
.on('data', function (result) { | ||
result.should.eql({ parent: { child: { grandchild: { test: true } } } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
it('returns relationships', done => { | ||
cypher(` | ||
CREATE ()-[a:somerel { foo: 'bar' }]->() | ||
RETURN a | ||
`) | ||
.on('data', result => result.should.eql({ a: { foo: 'bar' } })) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
; | ||
}); | ||
it('handles null', function (done) { | ||
cypher('return null') | ||
.on('data', function (result) { | ||
result.should.eql({ 'null': null }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
it('recursively returns data values', done => { | ||
cypher('match (n:Test) return { child: { grandchild: n }} as parent limit 1') | ||
.on('data', result => result.should.eql({ parent: { child: { grandchild: { test: true } } } })) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
; | ||
}); | ||
it('works with trailing slash', function (done){ | ||
var cyp = require('../index')('http://localhost:7474/'); | ||
cyp('match (n:Test) return n limit 1') | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
.resume() | ||
it('handles null', done => { | ||
cypher('return null') | ||
.on('data', result => result.should.eql({ 'null': null })) | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
; | ||
}); | ||
it('works with basic http auth', function (done){ | ||
var cyp = require('../index')('http://neo:cypher@localhost:7474/'); | ||
cyp('match (n:Test) return n limit 1') | ||
.on('error', shouldNotError) | ||
.on('end', done) | ||
.resume() | ||
; | ||
}); | ||
it('works with parameters', function (done) { | ||
it('works with parameters', done => { | ||
var results = 0; | ||
cypher('match (n:Test) where n.test={test} return n limit 1', { test: true }) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
.on('data', result => { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', () => { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
}); | ||
it('handles accepts a variety of statement formats', function (done) { | ||
it('handles accepts a variety of statement formats', done => { | ||
var results = 0; | ||
@@ -172,4 +145,4 @@ var query = 'match (n:Test) return n limit 1'; | ||
cypher([ { statement: query }, { statement: query } ]), | ||
].forEach(function (stream) { | ||
stream.on('data', function (result) { | ||
].forEach(stream => { | ||
stream.on('data', result => { | ||
results++; | ||
@@ -184,3 +157,3 @@ result.should.eql({ n: { test: true } }); | ||
it('calls statement callbacks', function (done) { | ||
it('calls statement callbacks', done => { | ||
var results = 0; | ||
@@ -190,16 +163,17 @@ var calls = 0; | ||
var query = 'match (n:Test) return n limit 2'; | ||
function callback(stream) { | ||
var callback = stream => { | ||
stream | ||
.on('data', function (result) { | ||
result.should.eql({ n: { test: true } }); | ||
results++; | ||
}) | ||
.on('end', function () { | ||
ended++; | ||
}) | ||
.on('data', result => { | ||
result.should.eql({ n: { test: true } }); | ||
results++; | ||
}) | ||
.on('end', () => { | ||
ended++; | ||
}) | ||
; | ||
calls++; | ||
} | ||
}; | ||
var statement = { statement: query, callback: callback }; | ||
cypher([ statement, statement ]).on('end', function () { | ||
cypher([ statement, statement ]) | ||
.on('end', () => { | ||
calls.should.equal(2); | ||
@@ -209,81 +183,6 @@ ended.should.equal(2); | ||
done(); | ||
}).resume(); | ||
}); | ||
it('supports node/rel metadata', function (done) { | ||
var results = 0; | ||
cypher({ | ||
statement: 'match (n:Test) return n limit 1', | ||
metadata: true | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.be.type('object'); | ||
result.n.should.be.type('object'); | ||
result.n.data.should.eql({ test: true }); | ||
result.n.self.should.be.type('string'); | ||
result.n.metadata.should.be.type('object'); | ||
result.n.metadata.id.should.be.type('number'); | ||
result.n.metadata.labels.should.eql(['Test']); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
.resume(); | ||
}); | ||
it('supports custom headers', function (done) { | ||
var headers = { | ||
'X-Foo': 'Bar', | ||
'x-lorem': 'ipsum' | ||
}; | ||
// for this test, use a mock server to test that headers were actually sent, | ||
// but then proxy the request to Neo4j afterward. | ||
// TODO: use https://github.com/pgte/nock? not sure how to check request | ||
// headers though (not just match them). | ||
var server = http.createServer(function (req, res) { | ||
req.headers['x-foo'].should.equal('Bar'); | ||
req.headers['x-lorem'].should.equal('ipsum'); | ||
// proxy request to real Neo4j server: | ||
req.pipe(http.request({ | ||
hostname: 'localhost', | ||
port: 7474, | ||
method: req.method, | ||
path: req.url, | ||
headers: req.headers | ||
}, function (neo4jRes) { | ||
res.writeHead(neo4jRes.statusCode, neo4jRes.headers); | ||
neo4jRes.pipe(res); | ||
})); | ||
// and shut down the server now (so Node can cleanly exit): | ||
server.close(); | ||
}); | ||
server.listen(0, function () { | ||
var url = 'http://localhost:' + server.address().port; | ||
var cypher = require('../')(url); | ||
var results = 0; | ||
cypher({ | ||
statement: 'match (n:Test) return n limit 1', | ||
headers: headers | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
}); | ||
}); | ||
}); |
'use strict'; | ||
var should = require('should'); | ||
var normalize = require('../normalize-query-statement'); | ||
var normalize = require('../util/normalize-query-statement'); | ||
@@ -5,0 +5,0 @@ describe('Query statement normalization', function () { |
'use strict'; | ||
var should = require('should'); | ||
var cypher = require('../index')('http://localhost:7474'); | ||
var http = require('http'); | ||
var should = require('should'); | ||
var cypher = require('../index')('bolt://0.0.0.0'); | ||
@@ -11,2 +10,3 @@ function shouldNotError(error) { | ||
describe('Transaction', function () { | ||
var testRecordsToCreate = 10; | ||
@@ -19,25 +19,54 @@ before(function (done){ | ||
cypher('FOREACH (x IN range(1,'+testRecordsToCreate+') | CREATE(:Test {test: true}))') | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
}); | ||
after(function (done){ | ||
after(done => | ||
cypher('MATCH (n:Test) DELETE n') | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume(); | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
.resume() | ||
); | ||
it('cannot write after commit (throw)', done => { | ||
var tx = cypher.transaction(); | ||
tx.commit(); | ||
try { | ||
tx.write('match (n:Test) return n limit 1'); | ||
} catch (error) { | ||
should.equal( | ||
'Error: Cannot write after commit.', | ||
String(error) | ||
); | ||
} | ||
tx.on('end', done); | ||
tx.resume(); | ||
}); | ||
it('cannot write after commit (emit)', done => { | ||
var tx = cypher.transaction(); | ||
tx.commit(); | ||
tx.on('error', error => | ||
should.equal( | ||
'Error: Cannot write after commit.', | ||
String(error) | ||
) | ||
); | ||
tx.on('end', done); | ||
tx.resume(); | ||
}); | ||
it('works', function (done) { | ||
var results = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
.on('data', result => { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
@@ -48,14 +77,116 @@ transaction.write('match (n:Test) return n limit 1'); | ||
context('data written within a transaction', () => { | ||
var tx; | ||
beforeEach(() => { | ||
tx = cypher.transaction(); | ||
tx.write('create (n:NewItem { foo: "bar"}) return n'); | ||
}); | ||
afterEach(done => | ||
cypher(` | ||
MATCH (n:NewItem) | ||
DETACH DELETE n | ||
`) | ||
.on('end', done) | ||
.resume() | ||
); | ||
it('is unavailable to parallel transactions', done => { | ||
var tx2 = cypher.transaction(); | ||
var called = false; | ||
tx.on('data', () => { | ||
tx2.write('MATCH (n:NewItem) return n'); | ||
tx2.on('data', () => called = true); | ||
tx2.on('end', () => { | ||
should.equal(false, called); | ||
tx.rollback(); | ||
}); | ||
tx2.commit(); | ||
}); | ||
tx.on('end', done); | ||
tx2.resume(); | ||
tx.resume(); | ||
}); | ||
// TODO: look into this. | ||
it.skip('Uncaught Read operations are not allowed for `NONE` transactions', done => { | ||
var tx2 = cypher.transaction(); | ||
var called = false; | ||
tx.on('data', () => { | ||
console.log('here'); | ||
tx2.write('MATCH (n:NewItem) return n'); | ||
tx2.on('data', () => called = true); | ||
tx2.on('end', () => should.equal(false, called)); | ||
}); | ||
tx2.on('end', done); | ||
tx2.resume(); | ||
tx2.commit(); | ||
tx.resume(); | ||
// tx.commit(); | ||
}); | ||
it('is available after commit', done => { | ||
var tx = cypher.transaction(); | ||
var numToCreate = 10; | ||
for(var i = 0; i < numToCreate; i++) { | ||
tx.write('create (n:NewItem { foo: "bar"}) return n'); | ||
} | ||
var numResults = 0; | ||
tx.on('end', () => { | ||
cypher('match (n:NewItem) return n') | ||
.on('data', data => { | ||
should.equal('bar', data.n.foo); | ||
numResults++; | ||
}) | ||
.on('end', () => { | ||
should.equal(numToCreate, numResults); | ||
done(); | ||
}); | ||
}); | ||
tx.commit(); | ||
tx.resume(); | ||
}); | ||
it('is unavailable outside the transaction before commit', done => { | ||
var tx = cypher.transaction(); | ||
tx.write('create (n:NewItem { foo: "bar"}) return n'); | ||
tx.on('data', () => { | ||
var called = false; | ||
cypher('match (n:NewItem) return n') | ||
.on('data', () => called = true) | ||
.on('end', () => { | ||
should.equal(false, called); | ||
tx.commit(); | ||
}); | ||
}); | ||
tx.on('end', done); | ||
tx.resume(); | ||
}); | ||
}); | ||
it('handles multiple writes', function (done) { | ||
var results = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(2); | ||
done(); | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(2); | ||
done(); | ||
}) | ||
; | ||
@@ -67,14 +198,14 @@ transaction.write('match (n:Test) return n limit 1'); | ||
it('handles accepts a variety of statement formats', function (done) { | ||
it('accepts a variety of statement formats', function (done) { | ||
var results = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(6); | ||
done(); | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(6); | ||
done(); | ||
}) | ||
; | ||
@@ -92,11 +223,11 @@ var query = 'match (n:Test) return n limit 1'; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
@@ -106,47 +237,20 @@ transaction.write({ statement: 'match (n:Test) return n limit 1', commit: true }); | ||
it('automatically batches queries for performance', function (done) { | ||
// results may vary, depending on your system. | ||
// tests on macbook pro were around ~100ms | ||
// Travis CI is slow. Give him more time. | ||
if (process.env.TRAVIS_CI) { | ||
this.timeout(5000); | ||
} | ||
var results = 0; | ||
var queriesToRun = 1000; | ||
var queriesWritten = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(queriesToRun); | ||
done(); | ||
}) | ||
; | ||
while (queriesWritten++ < queriesToRun) { | ||
transaction.write('match (n:Test) return n limit 1'); | ||
} | ||
transaction.commit(); | ||
}); | ||
it('can eagerly rollback if queries are still buffered', function (done) { | ||
var results = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(0); | ||
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count') | ||
.on('data', function (result) { | ||
result.count.should.equal(0); | ||
done(); | ||
}) | ||
.on('error', shouldNotError) | ||
; | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(0); | ||
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count') | ||
.on('data', result => { | ||
result.count.should.equal(0); | ||
}) | ||
.on('end', done) | ||
.on('error', shouldNotError) | ||
; | ||
}) | ||
; | ||
@@ -162,16 +266,16 @@ transaction.write('match (n:Test) set n.foo = "bar" return n'); | ||
var transaction = cypher.transaction() | ||
.on('data', function () { | ||
results++; | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(testRecordsToCreate*2); | ||
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count') | ||
.on('data', function (result) { | ||
result.count.should.equal(0); | ||
done(); | ||
}) | ||
.on('error', shouldNotError) | ||
; | ||
}) | ||
.on('data', function () { | ||
results++; | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(testRecordsToCreate*2); | ||
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count') | ||
.on('data', function (result) { | ||
result.count.should.equal(0); | ||
done(); | ||
}) | ||
.on('error', shouldNotError) | ||
; | ||
}) | ||
; | ||
@@ -188,11 +292,11 @@ transaction.write('match (n:Test) set n.foo = "bar" return n'); | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
@@ -206,72 +310,2 @@ transaction.write({ | ||
it('emits expiration', function (done) { | ||
var called = false; | ||
var transaction = cypher.transaction() | ||
.on('error', shouldNotError) | ||
.on('expires', function () { | ||
called = true; | ||
}) | ||
.on('end', function () { | ||
called.should.equal(true); | ||
done(); | ||
}) | ||
; | ||
transaction.resume(); | ||
transaction.write('match (n:Test) return n limit 1'); | ||
setTimeout(function() { | ||
transaction.write('match (n:Test) return n limit 1'); | ||
transaction.commit(); | ||
}, 0); | ||
}); | ||
it.skip('handles expiration', function (done) { | ||
// set this equal to neo4j-server.properties -> org.neo4j.server.transaction.timeout (default 60) | ||
var serverTimeout = 60; | ||
var errorCalled = false; | ||
var expiresCalled = false; | ||
var expiredCalled = false; | ||
var transaction = cypher.transaction(); | ||
this.timeout((serverTimeout+10)*1000); | ||
transaction.on('expires', function () { | ||
expiresCalled = true; | ||
}); | ||
transaction.on('expired', function () { | ||
expiredCalled = true; | ||
}); | ||
transaction.on('error', function (error) { | ||
errorCalled = true; | ||
error.neo4j.should.eql({ | ||
statusCode: 400, | ||
errors: [{ | ||
code : 'Neo.ClientError.Transaction.UnknownId', | ||
message : 'Unrecognized transaction id. Transaction may have timed out and been rolled back.' | ||
}], | ||
results: [] | ||
}); | ||
}); | ||
transaction.on('end', function () { | ||
errorCalled .should.equal(true); | ||
expiresCalled.should.equal(true); | ||
expiredCalled.should.equal(true); | ||
done(); | ||
}); | ||
transaction.resume(); | ||
transaction.write('match (n:Test) return n limit 1'); | ||
setTimeout(function() { | ||
transaction.write('match (n:Test) return n limit 1'); | ||
transaction.commit(); | ||
}, ((serverTimeout+5)*1000)); | ||
}); | ||
it('calls statement callbacks', function (done) { | ||
@@ -284,9 +318,9 @@ var results = 0; | ||
stream | ||
.on('data', function (result) { | ||
result.should.eql({ n: { test: true } }); | ||
results++; | ||
}) | ||
.on('end', function () { | ||
ended++; | ||
}) | ||
.on('data', function (result) { | ||
result.should.eql({ n: { test: true } }); | ||
results++; | ||
}) | ||
.on('end', function () { | ||
ended++; | ||
}) | ||
; | ||
@@ -309,81 +343,2 @@ calls++; | ||
it('supports node/rel metadata', function (done) { | ||
var results = 0; | ||
var transaction = cypher.transaction({ metadata: true }) | ||
.on('data', function (result) { | ||
results++; | ||
result.should.be.type('object'); | ||
result.n.should.be.type('object'); | ||
result.n.data.should.eql({ test: true }); | ||
result.n.self.should.be.type('string'); | ||
result.n.metadata.should.be.type('object'); | ||
result.n.metadata.id.should.be.type('number'); | ||
result.n.metadata.labels.should.eql(['Test']); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function() { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
transaction.write('match (n:Test) return n limit 1'); | ||
transaction.commit(); | ||
}); | ||
// NOTE: This support is basic -- assumes only one query per request. | ||
// https://github.com/brian-gates/cypher-stream/issues/10#issuecomment-72090757 | ||
it('supports custom headers', function (done) { | ||
var headers = { | ||
'X-Foo': 'Bar', | ||
'x-lorem': 'ipsum' | ||
}; | ||
// for this test, use a mock server to test that headers were actually sent, | ||
// but then proxy the request to Neo4j afterward. | ||
// TODO: use https://github.com/pgte/nock? not sure how to check request | ||
// headers though (not just match them). | ||
var server = http.createServer(function (req, res) { | ||
req.headers['x-foo'].should.equal('Bar'); | ||
req.headers['x-lorem'].should.equal('ipsum'); | ||
// proxy request to real Neo4j server: | ||
req.pipe(http.request({ | ||
hostname: 'localhost', | ||
port: 7474, | ||
method: req.method, | ||
path: req.url, | ||
headers: req.headers | ||
}, function (neo4jRes) { | ||
res.writeHead(neo4jRes.statusCode, neo4jRes.headers); | ||
neo4jRes.pipe(res); | ||
})); | ||
// and shut down the server now (so Node can cleanly exit): | ||
server.close(); | ||
}); | ||
server.listen(0, function () { | ||
var url = 'http://localhost:' + server.address().port; | ||
var cypher = require('../')(url); | ||
var results = 0; | ||
var transaction = cypher.transaction() | ||
.on('data', function (result) { | ||
results++; | ||
result.should.eql({ n: { test: true } }); | ||
}) | ||
.on('error', shouldNotError) | ||
.on('end', function () { | ||
results.should.eql(1); | ||
done(); | ||
}) | ||
; | ||
transaction.write({ | ||
statement: 'match (n:Test) return n limit 1', | ||
headers: headers | ||
}); | ||
transaction.commit(); | ||
}); | ||
}); | ||
}); |
'use strict'; | ||
var Duplex = require('stream').Duplex; | ||
var util = require('util'); | ||
var CypherStream = require('./CypherStream'); | ||
var normalize = require('./normalize-query-statement'); | ||
var $ = require('highland'); | ||
var CypherStream = require('./CypherStream'); | ||
var Duplex = require('stream').Duplex; | ||
var normalize = require('./util/normalize-query-statement'); | ||
// var R = require('ramda'); | ||
util.inherits(TransactionStream, Duplex); | ||
// var tap = R.tap; | ||
// var log = tap(console.log.bind(console)); | ||
// Options: | ||
// - debounceTime: number of milliseconds to wait between queries to collect and | ||
// batch request them. | ||
// - batchSize: maximimum number of queries to send at a time. | ||
// - metadata: true if node & relationship metadata should be returned too, | ||
// not just property data. (This translates to Neo4j's REST format.) | ||
function TransactionStream(url, options) { | ||
Duplex.call(this, { objectMode: true }); | ||
class TransactionStream extends Duplex { | ||
constructor(session) { | ||
super({ objectMode: true }); | ||
var self = this; | ||
var buffer = []; | ||
var transactionId; | ||
var debounce; | ||
var debounceTime = options && options.debounceTime || 0; | ||
var batchSize = options && options.batchSize || 10000; | ||
var metadata = options && options.metadata; | ||
this.session = session; | ||
this.tx = session.beginTransaction(); | ||
this.statements = $(); | ||
this.commit = function commitAlias() { | ||
return self.write({ commit: true }); | ||
}; | ||
this.writes = this.statements.fork() | ||
.flatMap(normalize) | ||
.map(statement => { | ||
if(statement.commit) { | ||
this.commit(); | ||
} | ||
return $(new CypherStream(this.tx, statement)); | ||
}) | ||
; | ||
this.rollback = function rollbackAlias() { | ||
return self.write({ rollback: true }); | ||
}; | ||
this.results = this.writes.fork() | ||
.flatten() | ||
.doto(x => this.push(x)) | ||
.errors(error => this.emit('error', error)) | ||
; | ||
function handle() { | ||
var statements = []; | ||
var callbacks = []; | ||
var options = {}; | ||
this.writes.resume(); | ||
this.results.resume(); | ||
} | ||
if(metadata) { | ||
options.metadata = metadata; | ||
_write(chunk, encoding, callback) { | ||
if(this.rolledBack) { | ||
throw new Error('Cannot write after rollback.'); | ||
} | ||
if (transactionId) { | ||
options.transactionId = transactionId; | ||
if(this.committed) { | ||
throw new Error('Cannot write after commit.'); | ||
} | ||
while(buffer.length) { | ||
var input = buffer.shift(); | ||
statements = statements.concat(normalize(input)); | ||
if (input.commit) { options.commit = true; } | ||
if (input.rollback) { options.rollback = true; } | ||
} | ||
this.statements.write(chunk); | ||
callback(); | ||
} | ||
// console.log('new CypherStream', url, statements, options); | ||
var stream = new CypherStream(url, statements, options); | ||
_read() { } | ||
stream.on('transactionId', function transactionIdHandler(txId) { | ||
if (!transactionId) { | ||
transactionId = txId; | ||
} | ||
}); | ||
commit() { | ||
if(this.committed) { | ||
return; | ||
} | ||
this.committed = true; | ||
stream.on('expires', function expiresHandler(date) { | ||
self.emit('expires', date); | ||
this.writes.on('end', () => { | ||
this.tx.commit() | ||
.subscribe({ | ||
onCompleted: () => { | ||
this.emit('comitted'); | ||
this.push(null); | ||
}, | ||
onError: error => { | ||
this.emit('error', error); | ||
this.push(null); | ||
} | ||
}); | ||
}); | ||
stream.on('transactionExpired', function transactionExpiredHandler() { | ||
self.emit('expired'); | ||
}); | ||
this.statements.end(); | ||
} | ||
stream.on('data', function dataHandler(data) { | ||
self.push(data); | ||
}); | ||
rollback() { | ||
if(this.rolledBack) { | ||
return; | ||
} | ||
this.rolledBack = true; | ||
stream.on('error', function errorHandler(errors) { | ||
self.emit('error', errors); | ||
}); | ||
this.statements.end(); | ||
this.results.end(); | ||
this.writes.end(); | ||
stream.on('end', function endHandler() { | ||
if(options.rollback || options.commit) { | ||
self.push(null); | ||
this.tx.rollback() | ||
.subscribe({ | ||
onCompleted: () => { | ||
this.push(null); | ||
}, | ||
onError: error => { | ||
this.emit('error', error); | ||
this.push(null); | ||
} | ||
@@ -86,17 +98,4 @@ }); | ||
this._write = function (chunk, encoding, callback) { | ||
buffer.push(chunk); | ||
if(debounce) { clearTimeout(debounce); } | ||
// debounce to allow writes to buffer | ||
if(buffer.length === batchSize) { | ||
handle(); | ||
} else { | ||
debounce = setTimeout(handle, debounceTime); | ||
} | ||
callback(); | ||
}; | ||
this._read = function () { }; | ||
} | ||
module.exports = TransactionStream; |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
3
1
189700
4
28
813
+ Addedhighland@^2.9.0
+ Addedneo4j-driver@^1.0.3
+ Addedramda@^0.21.0
+ Added@babel/runtime@7.26.0(transitive)
+ Addedhighland@2.13.5(transitive)
+ Addedneo4j-driver@1.7.8(transitive)
+ Addedpunycode@2.3.1(transitive)
+ Addedramda@0.21.0(transitive)
+ Addedregenerator-runtime@0.14.1(transitive)
+ Addedtext-encoding-utf-8@1.0.2(transitive)
+ Addeduri-js@4.4.1(transitive)
+ Addedutil-deprecate@1.0.2(transitive)