Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cypher-stream

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cypher-stream - npm Package Compare versions

Comparing version 0.3.0 to 1.0.0-alpha

test/waitforneo4j

252

CypherStream.js
'use strict';
var oboe = require('oboe');
var Readable = require('stream').Readable;
var PassThrough = require('stream').PassThrough;
var util = require('util');
var urlParser = require('url');
var normalize = require('./normalize-query-statement');
var moduleInfo = require('./package.json');
var $ = require('highland');
var neo4j = require('neo4j-driver').v1;
var normalize = require('./util/normalize-query-statement');
var observableToStream = require('./util/observable-to-stream');
var R = require('ramda');
var Readable = require('stream').Readable;
var toNative = require('./util/to-native');
util.inherits(CypherStream, Readable);
var compose = R.compose;
var cond = R.cond;
var curry = R.curry;
var has = R.has;
var map = R.map;
var prop = R.prop;
// Options:
// - transactionId: string ID for the current transaction, if there is one.
// - commit: true if this query should be committed, whether it's in a
// transaction or not.
// - rollback: true if this transaction should be rolled back. Implies that
// `commit` is *not* true, and that a `transactionId` is set.
// - metadata: true if node & relationship metadata should be returned too,
// not just property data. (This translates to Neo4j's REST format.)
// - headers: dictionary of headers to include in this request.
function CypherStream(databaseUrl, statements, options) {
Readable.call(this, { objectMode: true });
statements = normalize(statements).filter(function (statement) {
// Support passing in options within statement object:
if(statement.commit) {
options.commit = true;
delete statement.commit;
}
if(statement.rollback) {
options.rollback = true;
delete statement.rollback;
}
if(statement.metadata) {
options.metadata = true;
delete statement.metadata;
}
if(statement.headers) {
options.headers = statement.headers;
delete statement.headers;
}
// But only count this statement object if it actually has a statement:
return !!statement.statement;
});
// var tap = R.tap;
// var log = tap(console.log.bind(console));
// if a rollback is requested before a transactionId is acquired, we can quit early.
if(options.rollback && !options.transactionId) {
this.push(null);
return this;
}
// Recursively map Neo4j values
// to their native equivalants
// if metadata is requested, we need to specify that on each statement:
if (options.metadata) {
statements.forEach(function (statement) {
statement.resultDataContents = ['REST'];
});
}
// session => statement => observable
var run = curry((runner, statement) =>
runner.run(statement.statement, statement.parameters)
);
var columns;
var transactionTimeout;
var self = this;
var headers = {
'X-Stream': true,
'Accept': 'application/json',
'User-Agent': 'cypher-stream/' + moduleInfo.version
};
var currentStatement = 0;
var callbackStream = null;
var runStream = curry(compose(observableToStream, run));
var parsedUrl = urlParser.parse(databaseUrl);
var emitError = R.curry((stream, error) =>
stream.emit('error', error)
);
//add HTTP basic auth if needed
if (parsedUrl.auth) {
headers.Authorization = 'Basic ' + new Buffer(parsedUrl.auth).toString('base64');
}
var handleNeo4jError = emit => compose(
map(compose(
emit,
error => new neo4j.Neo4jError(error.message, error.code)
)),
prop('fields')
);
//add any custom HTTP headers
for (var key in options.headers || {}) {
headers[key] = options.headers[key];
}
var handleError = emit => cond([
[isNeo4jError, handleNeo4jError(emit)],
[R.T, emit]
]);
if (databaseUrl[databaseUrl.length - 1] !== '/') {
databaseUrl += '/'; // ensure trailing slash
var isNeo4jError = R.has('fields');
class CypherStream extends Readable {
constructor(runner, statements) {
super({ objectMode: true });
this.statements = statements;
this.runner = runner;
this.start();
}
var url = databaseUrl+'db/data/transaction';
if (options && options.transactionId) {
url += '/'+options.transactionId;
}
if (options && options.commit) {
url += '/commit';
}
function transactionExpired () {
self.emit('transactionExpired');
start() {
$([this.statements])
.flatMap(normalize)
.filter(has('statement'))
.flatMap(statement => {
var stream =
runStream(this.runner, statement)
.map(toNative);
if(statement.callback) {
statement.callback(stream.observe());
}
return stream;
})
.errors(handleError(emitError(this)))
.doto(x => this.push(x))
.on('end', () => this.push(null))
.resume()
;
}
// console.log('%s %s', options.transactionId && options.rollback ? 'DELETE': 'POST', url, JSON.stringify(statements));
var stream = oboe({
url : url,
method : options.transactionId && options.rollback ? 'DELETE': 'POST',
headers : headers,
body : { statements: statements },
});
stream.node('!.*', function CypherStreamAll(){
return oboe.drop; // discard records as soon as they're processed to conserve memory
});
stream.on('start', function CypherStreamStart(status, headers) {
if (headers.location) {
self.emit('transactionId', headers.location.split('/').pop());
}
});
stream.node('!transaction.expires', function CypherStreamTransactionExpires(date) {
clearTimeout(transactionTimeout);
var timeTillExpired = Date.parse(date)-Date.now();
transactionTimeout = setTimeout(transactionExpired, timeTillExpired);
self.emit('expires', date);
});
stream.path('!results[*]', function CypherStreamResult() {
if (callbackStream) {
self.unpipe(callbackStream);
callbackStream.end();
callbackStream = null;
}
if (statements[currentStatement].callback) {
callbackStream = new PassThrough({ objectMode: true });
statements[currentStatement].callback(callbackStream);
}
currentStatement++;
});
stream.node('!results[*].columns', function CypherStreamNodeColumns(c) {
self.emit('columns', c);
columns = c;
});
var dataSelector = '!results[*].data[*].' + (options.metadata ? 'rest' : 'row');
stream.node(dataSelector, function CypherStreamNodeData(result) {
var data = {};
columns.forEach(function (column, i) {
data[column] = result[i];
});
if (callbackStream) {
callbackStream.write(data);
}
self.push(data);
});
stream.on('done', function CypherStreamDone() {
clearTimeout(transactionTimeout);
if (options && options.commit || options.rollback) {
self.emit('transactionComplete');
}
if (callbackStream) {
callbackStream.end();
}
self.push(null);
});
stream.node('!errors[*]', function CypherStreamHandleError(error) {
var message = 'Query Failure';
if (error.message) {
message += ': ' + error.message;
}
var err = new Error(message);
err.code = error.code;
self.emit('error', err);
});
stream.on('fail', function CypherStreamHandleFailure(error) {
var err;
// handle non-neo4j errors
if (!error.jsonBody) {
// pass the Error instance through, creating one if necessary
err = error.thrown || new Error('Neo4j ' + error.statusCode);
err.statusCode = error.statusCode;
err.body = error.body;
err.jsonBody = error.jsonBody;
self.emit('error', err);
self.push(null);
return;
}
// handle neo4j errors
var message = 'Query failure';
var statusCode = 400;
if (error.jsonBody.message) {
message += ': '+error.jsonBody.message;
}
if (error.jsonBody.statusCode) {
statusCode = error.jsonBody.statusCode;
}
err = new Error(message);
err.neo4j = error.jsonBody;
err.neo4j.statusCode = statusCode;
self.emit('error', err);
self.push(null);
});
this._read = function () { };
_read() {}
}
module.exports = CypherStream;

@@ -0,16 +1,44 @@

'use strict';
var CypherStream = require('./CypherStream');
var neo4j = require('neo4j-driver').v1;
var R = require('ramda');
var TransactionStream = require('./TransactionStream');
module.exports = function Connection(url) {
var factory = function CypherStreamFactory(query, params) {
var statements = query;
if (params) {
statements = [ { statement: query, parameters: params } ];
var all = R.all;
var compose = R.compose;
var cond = R.cond;
var isNil = R.isNil;
var not = R.not;
var unapply = R.unapply;
var always = R.always;
var notNil = compose(not, isNil);
// (user, pass) => auth || undefined
var auth = cond([
[unapply(all(notNil)), neo4j.auth.basic ],
[R.T, always(undefined)],
]);
module.exports = function Connection(url, options) {
options = options || {};
var driver = neo4j.driver(url || 'bolt://localhost', auth(options.username, options.password));
var factory = function CypherStreamFactory(statement, parameters) {
if (parameters) {
statement = [ { statement, parameters } ];
}
return new CypherStream(url, statements, { commit: true });
var session = driver.session();
return new CypherStream(session, statement)
.on('end', () => session.close());
};
factory.transaction = function (options) {
return new TransactionStream(url, options);
factory.transaction = options => {
var session = driver.session();
return new TransactionStream(session, options)
.on('end', () => session.close());
};
return factory;
};
{
"name": "cypher-stream",
"version": "0.3.0",
"version": "1.0.0-alpha",
"description": "Streams cypher query results in a clean format",

@@ -25,3 +25,6 @@ "main": "index.js",

"dependencies": {
"oboe": "^2.1.1"
"highland": "^2.9.0",
"neo4j-driver": "^1.0.3",
"oboe": "^2.1.1",
"ramda": "^0.21.0"
},

@@ -28,0 +31,0 @@ "devDependencies": {

'use strict';
var should = require('should');
var cypher = require('../index')('http://localhost:7474');
var http = require('http');
var cypher = require('../index')('bolt://0.0.0.0');
var should = require('should');
var R = require('ramda');
function shouldNotError(error) {
should.not.exist(error);
}
var shouldNotError = error => should.not.exist(error);
describe('Cypher stream', function () {
describe('Cypher stream', () => {
var testRecordsToCreate = 10;
before(function (done){
before(function(done) {
// Travis CI is slow. Give him more time.

@@ -18,148 +17,122 @@ if (process.env.TRAVIS_CI) {

cypher('FOREACH (x IN range(1,'+testRecordsToCreate+') | CREATE(:Test {test: true}))')
.on('end', done)
.on('error', shouldNotError)
.resume();
.on('end', done)
.on('error', shouldNotError)
.resume();
});
after(function (done){
after(done => {
cypher('MATCH (n:Test) DELETE n')
.on('end', done)
.on('error', shouldNotError)
.resume();
.on('end', done)
.on('error', shouldNotError)
.resume();
});
it('works', function (done) {
it('works', done => {
var results = 0;
cypher('match (n:Test) return n limit 10')
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(10);
done();
})
.on('data', result => {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', () => {
results.should.eql(10);
done();
})
;
});
it('handles errors', function (done) {
it('handles errors', done => {
var errored = false;
cypher('invalid query')
.on('error', function (error) {
errored = true;
String(error).should.equal('Error: Query Failure: Invalid input \'i\': expected <init> (line 1, column 1)\n"invalid query"\n ^');
})
.on('end', function () {
errored.should.be.true;
done();
})
.resume() // need to manually start it since we have no on('data')
.on('error', error => {
errored = true;
should.equal(
error.code,
'Neo.ClientError.Statement.SyntaxError'
);
should.equal(
error.message,
'Invalid input \'i\': expected <init> (line 1, column 1 (offset: 0))\n"invalid query"\n ^'
);
})
.on('end', () => {
should.equal(true, errored);
done();
})
.resume() // need to manually start it since we have no on('data')
;
});
it('handles non-neo4j errors', function (done) {
var errored = false;
var expectedError = new Error('Test');
cypher('match (n:Test) return n limit 1')
.on('data', function () {
throw expectedError;
})
.on('error', function (error) {
errored = true;
error.should.equal(expectedError);
})
.on('end', function () {
errored.should.be.true;
done();
})
;
});
it('returns non-object values', function (done) {
it('returns non-object values', done => {
cypher('match (n:Test) return n.test as test limit 1')
.on('data', function (result) {
result.should.eql({ test: true });
})
.on('error', shouldNotError)
.on('end', done)
.on('data', result => result.should.eql({ test: true }))
.on('error', shouldNotError)
.on('end', done)
;
});
it('returns collections', function (done) {
it('returns collections', done => {
cypher('match (n:Test) return collect(n) as nodes limit 1')
.on('data', function (result) {
// 10x { test: true }
result.should.eql({ nodes: [{ test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true }, { test: true } ] });
})
.on('error', shouldNotError)
.on('end', done)
.on('data', result => {
result.should.eql({
nodes: R.times(R.always({ test: true }), 10)
});
})
.on('error', shouldNotError)
.on('end', done)
;
});
it('returns non-node collections', function (done) {
it('returns non-node collections', done => {
cypher('match (n:Test) return labels(n) as labels limit 1')
.on('data', function (result) {
result.should.eql({ labels: ['Test']});
})
.on('error', shouldNotError)
.on('end', done)
.on('data', result => result.should.eql({ labels: ['Test']}))
.on('error', shouldNotError)
.on('end', done)
;
});
it('recursively returns data values', function (done) {
cypher('match (n:Test) return { child: { grandchild: n }} as parent limit 1')
.on('data', function (result) {
result.should.eql({ parent: { child: { grandchild: { test: true } } } });
})
.on('error', shouldNotError)
.on('end', done)
it('returns relationships', done => {
cypher(`
CREATE ()-[a:somerel { foo: 'bar' }]->()
RETURN a
`)
.on('data', result => result.should.eql({ a: { foo: 'bar' } }))
.on('error', shouldNotError)
.on('end', done)
;
});
it('handles null', function (done) {
cypher('return null')
.on('data', function (result) {
result.should.eql({ 'null': null });
})
.on('error', shouldNotError)
.on('end', done)
it('recursively returns data values', done => {
cypher('match (n:Test) return { child: { grandchild: n }} as parent limit 1')
.on('data', result => result.should.eql({ parent: { child: { grandchild: { test: true } } } }))
.on('error', shouldNotError)
.on('end', done)
;
});
it('works with trailing slash', function (done){
var cyp = require('../index')('http://localhost:7474/');
cyp('match (n:Test) return n limit 1')
.on('error', shouldNotError)
.on('end', done)
.resume()
it('handles null', done => {
cypher('return null')
.on('data', result => result.should.eql({ 'null': null }))
.on('error', shouldNotError)
.on('end', done)
;
});
it('works with basic http auth', function (done){
var cyp = require('../index')('http://neo:cypher@localhost:7474/');
cyp('match (n:Test) return n limit 1')
.on('error', shouldNotError)
.on('end', done)
.resume()
;
});
it('works with parameters', function (done) {
it('works with parameters', done => {
var results = 0;
cypher('match (n:Test) where n.test={test} return n limit 1', { test: true })
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(1);
done();
})
.on('data', result => {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', () => {
results.should.eql(1);
done();
})
;
});
it('handles accepts a variety of statement formats', function (done) {
it('handles accepts a variety of statement formats', done => {
var results = 0;

@@ -172,4 +145,4 @@ var query = 'match (n:Test) return n limit 1';

cypher([ { statement: query }, { statement: query } ]),
].forEach(function (stream) {
stream.on('data', function (result) {
].forEach(stream => {
stream.on('data', result => {
results++;

@@ -184,3 +157,3 @@ result.should.eql({ n: { test: true } });

it('calls statement callbacks', function (done) {
it('calls statement callbacks', done => {
var results = 0;

@@ -190,16 +163,17 @@ var calls = 0;

var query = 'match (n:Test) return n limit 2';
function callback(stream) {
var callback = stream => {
stream
.on('data', function (result) {
result.should.eql({ n: { test: true } });
results++;
})
.on('end', function () {
ended++;
})
.on('data', result => {
result.should.eql({ n: { test: true } });
results++;
})
.on('end', () => {
ended++;
})
;
calls++;
}
};
var statement = { statement: query, callback: callback };
cypher([ statement, statement ]).on('end', function () {
cypher([ statement, statement ])
.on('end', () => {
calls.should.equal(2);

@@ -209,81 +183,6 @@ ended.should.equal(2);

done();
}).resume();
});
it('supports node/rel metadata', function (done) {
var results = 0;
cypher({
statement: 'match (n:Test) return n limit 1',
metadata: true
})
.on('data', function (result) {
results++;
result.should.be.type('object');
result.n.should.be.type('object');
result.n.data.should.eql({ test: true });
result.n.self.should.be.type('string');
result.n.metadata.should.be.type('object');
result.n.metadata.id.should.be.type('number');
result.n.metadata.labels.should.eql(['Test']);
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(1);
done();
})
;
.resume();
});
it('supports custom headers', function (done) {
var headers = {
'X-Foo': 'Bar',
'x-lorem': 'ipsum'
};
// for this test, use a mock server to test that headers were actually sent,
// but then proxy the request to Neo4j afterward.
// TODO: use https://github.com/pgte/nock? not sure how to check request
// headers though (not just match them).
var server = http.createServer(function (req, res) {
req.headers['x-foo'].should.equal('Bar');
req.headers['x-lorem'].should.equal('ipsum');
// proxy request to real Neo4j server:
req.pipe(http.request({
hostname: 'localhost',
port: 7474,
method: req.method,
path: req.url,
headers: req.headers
}, function (neo4jRes) {
res.writeHead(neo4jRes.statusCode, neo4jRes.headers);
neo4jRes.pipe(res);
}));
// and shut down the server now (so Node can cleanly exit):
server.close();
});
server.listen(0, function () {
var url = 'http://localhost:' + server.address().port;
var cypher = require('../')(url);
var results = 0;
cypher({
statement: 'match (n:Test) return n limit 1',
headers: headers
})
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(1);
done();
})
;
});
});
});
'use strict';
var should = require('should');
var normalize = require('../normalize-query-statement');
var normalize = require('../util/normalize-query-statement');

@@ -5,0 +5,0 @@ describe('Query statement normalization', function () {

'use strict';
var should = require('should');
var cypher = require('../index')('http://localhost:7474');
var http = require('http');
var should = require('should');
var cypher = require('../index')('bolt://0.0.0.0');

@@ -11,2 +10,3 @@ function shouldNotError(error) {

describe('Transaction', function () {
var testRecordsToCreate = 10;

@@ -19,25 +19,54 @@ before(function (done){

cypher('FOREACH (x IN range(1,'+testRecordsToCreate+') | CREATE(:Test {test: true}))')
.on('end', done)
.on('error', shouldNotError)
.resume();
.on('end', done)
.on('error', shouldNotError)
.resume();
});
after(function (done){
after(done =>
cypher('MATCH (n:Test) DELETE n')
.on('end', done)
.on('error', shouldNotError)
.resume();
.on('end', done)
.on('error', shouldNotError)
.resume()
);
it('cannot write after commit (throw)', done => {
var tx = cypher.transaction();
tx.commit();
try {
tx.write('match (n:Test) return n limit 1');
} catch (error) {
should.equal(
'Error: Cannot write after commit.',
String(error)
);
}
tx.on('end', done);
tx.resume();
});
it('cannot write after commit (emit)', done => {
var tx = cypher.transaction();
tx.commit();
tx.on('error', error =>
should.equal(
'Error: Cannot write after commit.',
String(error)
)
);
tx.on('end', done);
tx.resume();
});
it('works', function (done) {
var results = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(1);
done();
})
.on('data', result => {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(1);
done();
})
;

@@ -48,14 +77,116 @@ transaction.write('match (n:Test) return n limit 1');

context('data written within a transaction', () => {
var tx;
beforeEach(() => {
tx = cypher.transaction();
tx.write('create (n:NewItem { foo: "bar"}) return n');
});
afterEach(done =>
cypher(`
MATCH (n:NewItem)
DETACH DELETE n
`)
.on('end', done)
.resume()
);
it('is unavailable to parallel transactions', done => {
var tx2 = cypher.transaction();
var called = false;
tx.on('data', () => {
tx2.write('MATCH (n:NewItem) return n');
tx2.on('data', () => called = true);
tx2.on('end', () => {
should.equal(false, called);
tx.rollback();
});
tx2.commit();
});
tx.on('end', done);
tx2.resume();
tx.resume();
});
// TODO: look into this.
it.skip('Uncaught Read operations are not allowed for `NONE` transactions', done => {
var tx2 = cypher.transaction();
var called = false;
tx.on('data', () => {
console.log('here');
tx2.write('MATCH (n:NewItem) return n');
tx2.on('data', () => called = true);
tx2.on('end', () => should.equal(false, called));
});
tx2.on('end', done);
tx2.resume();
tx2.commit();
tx.resume();
// tx.commit();
});
it('is available after commit', done => {
var tx = cypher.transaction();
var numToCreate = 10;
for(var i = 0; i < numToCreate; i++) {
tx.write('create (n:NewItem { foo: "bar"}) return n');
}
var numResults = 0;
tx.on('end', () => {
cypher('match (n:NewItem) return n')
.on('data', data => {
should.equal('bar', data.n.foo);
numResults++;
})
.on('end', () => {
should.equal(numToCreate, numResults);
done();
});
});
tx.commit();
tx.resume();
});
it('is unavailable outside the transaction before commit', done => {
var tx = cypher.transaction();
tx.write('create (n:NewItem { foo: "bar"}) return n');
tx.on('data', () => {
var called = false;
cypher('match (n:NewItem) return n')
.on('data', () => called = true)
.on('end', () => {
should.equal(false, called);
tx.commit();
});
});
tx.on('end', done);
tx.resume();
});
});
it('handles multiple writes', function (done) {
var results = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(2);
done();
})
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(2);
done();
})
;

@@ -67,14 +198,14 @@ transaction.write('match (n:Test) return n limit 1');

it('handles accepts a variety of statement formats', function (done) {
it('accepts a variety of statement formats', function (done) {
var results = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(6);
done();
})
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(6);
done();
})
;

@@ -92,11 +223,11 @@ var query = 'match (n:Test) return n limit 1';

var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(1);
done();
})
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(1);
done();
})
;

@@ -106,47 +237,20 @@ transaction.write({ statement: 'match (n:Test) return n limit 1', commit: true });

it('automatically batches queries for performance', function (done) {
// results may vary, depending on your system.
// tests on macbook pro were around ~100ms
// Travis CI is slow. Give him more time.
if (process.env.TRAVIS_CI) {
this.timeout(5000);
}
var results = 0;
var queriesToRun = 1000;
var queriesWritten = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(queriesToRun);
done();
})
;
while (queriesWritten++ < queriesToRun) {
transaction.write('match (n:Test) return n limit 1');
}
transaction.commit();
});
it('can eagerly rollback if queries are still buffered', function (done) {
var results = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(0);
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count')
.on('data', function (result) {
result.count.should.equal(0);
done();
})
.on('error', shouldNotError)
;
})
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(0);
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count')
.on('data', result => {
result.count.should.equal(0);
})
.on('end', done)
.on('error', shouldNotError)
;
})
;

@@ -162,16 +266,16 @@ transaction.write('match (n:Test) set n.foo = "bar" return n');

var transaction = cypher.transaction()
.on('data', function () {
results++;
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(testRecordsToCreate*2);
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count')
.on('data', function (result) {
result.count.should.equal(0);
done();
})
.on('error', shouldNotError)
;
})
.on('data', function () {
results++;
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(testRecordsToCreate*2);
cypher('match (n:Test) where n.foo = "bar" or n.bar = "baz" return count(n) as count')
.on('data', function (result) {
result.count.should.equal(0);
done();
})
.on('error', shouldNotError)
;
})
;

@@ -188,11 +292,11 @@ transaction.write('match (n:Test) set n.foo = "bar" return n');

var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(1);
done();
})
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(1);
done();
})
;

@@ -206,72 +310,2 @@ transaction.write({

it('emits expiration', function (done) {
var called = false;
var transaction = cypher.transaction()
.on('error', shouldNotError)
.on('expires', function () {
called = true;
})
.on('end', function () {
called.should.equal(true);
done();
})
;
transaction.resume();
transaction.write('match (n:Test) return n limit 1');
setTimeout(function() {
transaction.write('match (n:Test) return n limit 1');
transaction.commit();
}, 0);
});
it.skip('handles expiration', function (done) {
// set this equal to neo4j-server.properties -> org.neo4j.server.transaction.timeout (default 60)
var serverTimeout = 60;
var errorCalled = false;
var expiresCalled = false;
var expiredCalled = false;
var transaction = cypher.transaction();
this.timeout((serverTimeout+10)*1000);
transaction.on('expires', function () {
expiresCalled = true;
});
transaction.on('expired', function () {
expiredCalled = true;
});
transaction.on('error', function (error) {
errorCalled = true;
error.neo4j.should.eql({
statusCode: 400,
errors: [{
code : 'Neo.ClientError.Transaction.UnknownId',
message : 'Unrecognized transaction id. Transaction may have timed out and been rolled back.'
}],
results: []
});
});
transaction.on('end', function () {
errorCalled .should.equal(true);
expiresCalled.should.equal(true);
expiredCalled.should.equal(true);
done();
});
transaction.resume();
transaction.write('match (n:Test) return n limit 1');
setTimeout(function() {
transaction.write('match (n:Test) return n limit 1');
transaction.commit();
}, ((serverTimeout+5)*1000));
});
it('calls statement callbacks', function (done) {

@@ -284,9 +318,9 @@ var results = 0;

stream
.on('data', function (result) {
result.should.eql({ n: { test: true } });
results++;
})
.on('end', function () {
ended++;
})
.on('data', function (result) {
result.should.eql({ n: { test: true } });
results++;
})
.on('end', function () {
ended++;
})
;

@@ -309,81 +343,2 @@ calls++;

it('supports node/rel metadata', function (done) {
var results = 0;
var transaction = cypher.transaction({ metadata: true })
.on('data', function (result) {
results++;
result.should.be.type('object');
result.n.should.be.type('object');
result.n.data.should.eql({ test: true });
result.n.self.should.be.type('string');
result.n.metadata.should.be.type('object');
result.n.metadata.id.should.be.type('number');
result.n.metadata.labels.should.eql(['Test']);
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(1);
done();
})
;
transaction.write('match (n:Test) return n limit 1');
transaction.commit();
});
// NOTE: This support is basic -- assumes only one query per request.
// https://github.com/brian-gates/cypher-stream/issues/10#issuecomment-72090757
it('supports custom headers', function (done) {
var headers = {
'X-Foo': 'Bar',
'x-lorem': 'ipsum'
};
// for this test, use a mock server to test that headers were actually sent,
// but then proxy the request to Neo4j afterward.
// TODO: use https://github.com/pgte/nock? not sure how to check request
// headers though (not just match them).
var server = http.createServer(function (req, res) {
req.headers['x-foo'].should.equal('Bar');
req.headers['x-lorem'].should.equal('ipsum');
// proxy request to real Neo4j server:
req.pipe(http.request({
hostname: 'localhost',
port: 7474,
method: req.method,
path: req.url,
headers: req.headers
}, function (neo4jRes) {
res.writeHead(neo4jRes.statusCode, neo4jRes.headers);
neo4jRes.pipe(res);
}));
// and shut down the server now (so Node can cleanly exit):
server.close();
});
server.listen(0, function () {
var url = 'http://localhost:' + server.address().port;
var cypher = require('../')(url);
var results = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function () {
results.should.eql(1);
done();
})
;
transaction.write({
statement: 'match (n:Test) return n limit 1',
headers: headers
});
transaction.commit();
});
});
});
'use strict';
var Duplex = require('stream').Duplex;
var util = require('util');
var CypherStream = require('./CypherStream');
var normalize = require('./normalize-query-statement');
var $ = require('highland');
var CypherStream = require('./CypherStream');
var Duplex = require('stream').Duplex;
var normalize = require('./util/normalize-query-statement');
// var R = require('ramda');
util.inherits(TransactionStream, Duplex);
// var tap = R.tap;
// var log = tap(console.log.bind(console));
// Options:
// - debounceTime: number of milliseconds to wait between queries to collect and
// batch request them.
// - batchSize: maximimum number of queries to send at a time.
// - metadata: true if node & relationship metadata should be returned too,
// not just property data. (This translates to Neo4j's REST format.)
function TransactionStream(url, options) {
Duplex.call(this, { objectMode: true });
class TransactionStream extends Duplex {
constructor(session) {
super({ objectMode: true });
var self = this;
var buffer = [];
var transactionId;
var debounce;
var debounceTime = options && options.debounceTime || 0;
var batchSize = options && options.batchSize || 10000;
var metadata = options && options.metadata;
this.session = session;
this.tx = session.beginTransaction();
this.statements = $();
this.commit = function commitAlias() {
return self.write({ commit: true });
};
this.writes = this.statements.fork()
.flatMap(normalize)
.map(statement => {
if(statement.commit) {
this.commit();
}
return $(new CypherStream(this.tx, statement));
})
;
this.rollback = function rollbackAlias() {
return self.write({ rollback: true });
};
this.results = this.writes.fork()
.flatten()
.doto(x => this.push(x))
.errors(error => this.emit('error', error))
;
function handle() {
var statements = [];
var callbacks = [];
var options = {};
this.writes.resume();
this.results.resume();
}
if(metadata) {
options.metadata = metadata;
_write(chunk, encoding, callback) {
if(this.rolledBack) {
throw new Error('Cannot write after rollback.');
}
if (transactionId) {
options.transactionId = transactionId;
if(this.committed) {
throw new Error('Cannot write after commit.');
}
while(buffer.length) {
var input = buffer.shift();
statements = statements.concat(normalize(input));
if (input.commit) { options.commit = true; }
if (input.rollback) { options.rollback = true; }
}
this.statements.write(chunk);
callback();
}
// console.log('new CypherStream', url, statements, options);
var stream = new CypherStream(url, statements, options);
_read() { }
stream.on('transactionId', function transactionIdHandler(txId) {
if (!transactionId) {
transactionId = txId;
}
});
commit() {
if(this.committed) {
return;
}
this.committed = true;
stream.on('expires', function expiresHandler(date) {
self.emit('expires', date);
this.writes.on('end', () => {
this.tx.commit()
.subscribe({
onCompleted: () => {
this.emit('comitted');
this.push(null);
},
onError: error => {
this.emit('error', error);
this.push(null);
}
});
});
stream.on('transactionExpired', function transactionExpiredHandler() {
self.emit('expired');
});
this.statements.end();
}
stream.on('data', function dataHandler(data) {
self.push(data);
});
rollback() {
if(this.rolledBack) {
return;
}
this.rolledBack = true;
stream.on('error', function errorHandler(errors) {
self.emit('error', errors);
});
this.statements.end();
this.results.end();
this.writes.end();
stream.on('end', function endHandler() {
if(options.rollback || options.commit) {
self.push(null);
this.tx.rollback()
.subscribe({
onCompleted: () => {
this.push(null);
},
onError: error => {
this.emit('error', error);
this.push(null);
}

@@ -86,17 +98,4 @@ });

this._write = function (chunk, encoding, callback) {
buffer.push(chunk);
if(debounce) { clearTimeout(debounce); }
// debounce to allow writes to buffer
if(buffer.length === batchSize) {
handle();
} else {
debounce = setTimeout(handle, debounceTime);
}
callback();
};
this._read = function () { };
}
module.exports = TransactionStream;

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc