Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cypher-stream

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cypher-stream - npm Package Compare versions

Comparing version 0.2.0 to 0.2.1

normalize-query-statement.js

59

CypherStream.js

@@ -5,2 +5,3 @@ var oboe = require('oboe');

var urlParser = require('url');
var normalize = require('./normalize-query-statement');

@@ -31,5 +32,14 @@ util.inherits(CypherStream, Readable);

Readable.call(this, { objectMode: true });
statements = normalize(statements).filter(function (statement) {
if(statement.commit) {
options.commit = true;
}
if(statement.rollback) {
options.rollback = true;
}
return statement.statement;
});
// if a rollback is requested before a transactionId is acquired, we can just stop here.
if(!options.transactionId && options.rollback) {
// if a rollback is requested before a transactionId is acquired, we can quit early.
if(options.rollback && !options.transactionId) {
this.push(null);

@@ -39,22 +49,5 @@ return this;

// Normalize various statement syntaxes to [ { statement: statement, parameters: parameters }]
// { statement: "statement" }
if (!(statements instanceof Array) && typeof statements === 'object') {
statements = [statements];
}
// "statement"
if (typeof statements === 'string') {
statements = [ { statement: statements } ];
}
// ["statement"]
if (statements instanceof Array && typeof statements[0] === 'string') {
statements = statements.map(function (statement) {
return { statement: statement };
});
}
if (!(statements instanceof Array) && !options.commit && !options.rollback) {
throw new Error('CypherStream: No statement or commit/rollback request received.');
}
var columns;
var transactionTimeout;
var self = this;

@@ -66,3 +59,2 @@ var headers = {

var transactionTimeout;

@@ -89,6 +81,6 @@ var parsedUrl = urlParser.parse(databaseUrl);

function transactionExpired () {
self.emit('expired');
self.push(null);
self.emit('transactionExpired');
}
// console.log("%s %s", options.transactionId && options.rollback ? 'DELETE': 'POST', url, statements);

@@ -99,6 +91,10 @@ var stream = oboe({

headers : headers,
body : statements ? { statements: statements } : null,
body : { statements: statements },
});
stream.on('start', function (status, headers) {
stream.node('!.*', function CypherStreamAll(){
return oboe.drop; // discard records as soon as they're processed to conserve memory
});
stream.on('start', function CypherStreamStart(status, headers) {
if (headers.location) {

@@ -109,6 +105,7 @@ self.emit('transactionId', headers.location.split('/').pop());

stream.node('!transaction.expires', function (result, path, ancestors) {
stream.node('!transaction.expires', function CypherStreamTransactionExpires(date, path, ancestors) {
clearTimeout(transactionTimeout);
var timeTillExpire = Date.parse('Sun, 19 Oct 2014 05:06:47')-Date.now();
transactionTimeout = setTimeout(transactionExpired, timeTillExpire);
var timeTillExpired = Date.parse(date)-Date.now();
transactionTimeout = setTimeout(transactionExpired, timeTillExpired);
self.emit('expires', date);
});

@@ -129,3 +126,3 @@

stream.done(function CypherStreamDone(complete) {
stream.on('done', function CypherStreamDone(complete) {
clearTimeout(transactionTimeout);

@@ -138,3 +135,3 @@ if (options && options.commit || options.rollback) {

stream.node('!errors[*]', function (error, path, ancestors) {
stream.node('!errors[*]', function CypherStreamHandleError(error, path, ancestors) {
var message = "Query Failure";

@@ -149,3 +146,3 @@ if (error.message) {

stream.fail(function CypherStreamHandleError(error) {
stream.on('fail', function CypherStreamHandleFailure(error) {
// handle non-neo4j errors

@@ -152,0 +149,0 @@ if (!error.jsonBody) {

{
"name": "cypher-stream",
"version": "0.2.0",
"version": "0.2.1",
"description": "Streams cypher query results in a clean format",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -51,2 +51,3 @@ # cypher-stream [![Build Status](https://travis-ci.org/brian-gates/cypher-stream.png?branch=master)](https://travis-ci.org/brian-gates/cypher-stream) [![NPM version](https://badge.fury.io/js/cypher-stream.png)](http://badge.fury.io/js/cypher-stream) [![devDependency Status](https://david-dm.org/brian-gates/cypher-stream.png?theme=shields.io)](https://david-dm.org/brian-gates/cypher-stream.png#info=devDependencies)

Transactions are duplex streams that allow you to write query statements then commit or roll back the written queries.

@@ -109,8 +110,7 @@

### Syncronous Batching
### Query Batching
Transactions automatically batch syncronous writes garnering significant performance gains.
Transactions automatically batch queries for significant performance gains. Try the following:
``` js
var results = 0;
var queriesToRun = 10000;

@@ -120,10 +120,4 @@ var queriesWritten = 0;

.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
console.log(result);
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(queriesToRun);
done();
})
;

@@ -130,0 +124,0 @@ while (queriesWritten++ < queriesToRun) {

@@ -63,2 +63,23 @@ var should = require('should');

it('handles accepts a variety of statement formats', function (done) {
var results = 0;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(6);
done();
})
;
var query = 'match (n:Test) return n limit 1';
transaction.write(query);
transaction.write({ statement: query });
transaction.write([ query, query ]);
transaction.write([ { statement: query }, { statement: query } ]);
transaction.commit();
});
it('handles write and commit', function (done) {

@@ -176,49 +197,71 @@ var results = 0;

// TODO fix this
it.skip('handles transaction expiration', function (done) {
var results = 0;
it('emits expiration', function (done) {
var called = false;
var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
.on('error', shouldNotError)
.on('expires', function (date) {
called = true;
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(1);
.on('end', function () {
called.should.equal(true);
done();
})
;
timekeeper.travel(new Date(Number(new Date())+100000000000));
transaction.resume();
transaction.write('match (n:Test) return n limit 1');
transaction.commit();
});
it.skip('batches async requests with specified debounceTime', function (done) {
var results = 0;
var queriesToRun = 1000;
var queriesWritten = 0;
var transaction = cypher.transaction()
// var transaction = cypher.transaction()
.on('data', function (result) {
results++;
result.should.eql({ n: { test: true } });
})
.on('error', shouldNotError)
.on('end', function() {
results.should.eql(queriesToRun);
done();
})
;
var interval = setInterval(function () {
if (queriesWritten >= queriesToRun) {
clearTimeout(interval);
transaction.commit();
return;
}
setTimeout(function() {
transaction.write('match (n:Test) return n limit 1');
queriesWritten++;
}, 1);
transaction.commit();
}, 0);
});
it.skip('handles expiration', function (done) {
var serverTimeout = 60; // set this equal to neo4j-server.properties -> org.neo4j.server.transaction.timeout (default 60)
var errorCalled = false;
var expiresCalled = false;
var expiredCalled = false;
var transaction = cypher.transaction();
this.timeout((serverTimeout+10)*1000);
transaction.on('expires', function (date) {
expiresCalled = true;
});
transaction.on('expired', function () {
expiredCalled = true;
});
transaction.on('error', function (error) {
errorCalled = true;
error.neo4j.should.eql({
statusCode: 400,
errors: [{
code : 'Neo.ClientError.Transaction.UnknownId',
message : 'Unrecognized transaction id. Transaction may have timed out and been rolled back.'
}],
results: []
});
});
transaction.on('end', function () {
errorCalled .should.equal(true);
expiresCalled.should.equal(true);
expiredCalled.should.equal(true);
done();
});
transaction.resume();
transaction.write('match (n:Test) return n limit 1');
setTimeout(function() {
transaction.write('match (n:Test) return n limit 1');
transaction.commit();
}, ((serverTimeout+5)*1000));
});
});

@@ -1,4 +0,5 @@

var Duplex = require('stream').Duplex;
var util = require('util');
var CypherStream = require('./CypherStream');
var Duplex = require('stream').Duplex;
var util = require('util');
var CypherStream = require('./CypherStream');
var normalize = require('./normalize-query-statement');

@@ -18,11 +19,11 @@ util.inherits(TransactionStream, Duplex);

this.commit = function () {
return this.write({ commit: true });
return self.write({ commit: true });
};
this.rollback = function () {
return this.write({ rollback: true });
return self.write({ rollback: true });
};
function processChunk(input, encoding, callback) {
var statements = normalizeStatementInput(input);
var statements = normalize(input);
var callbacks = [callback];

@@ -43,3 +44,3 @@ var options = {};

var buffered = buffer.shift();
var bufferedStatements = normalizeStatementInput(buffered.chunk);
var bufferedStatements = normalize(buffered.chunk);
if (bufferedStatements) {

@@ -56,2 +57,3 @@ statements = statements.concat(bufferedStatements);

}
var stream = new CypherStream(url, statements, options);

@@ -65,12 +67,18 @@

// stream.on('transactionComplete', function () {
// self.push(null);
// });
stream.on('expires', function (date) {
self.emit('expires', date);
});
stream.on('transactionExpired', function () {
self.emit('expired');
});
stream.on('data', function (data) {
self.push(data);
});
stream.on('error', function (errors) {
self.emit('error', errors);
});
stream.on('end', function () {

@@ -84,2 +92,3 @@ callbacks.forEach(function (callback) {

});
}

@@ -96,25 +105,2 @@

function normalizeStatementInput(input) {
// "statement"
if (typeof input === 'string') {
return [{ statement: input }];
}
// ["statement"]
if (input instanceof Array && typeof input[0] === 'string') {
return input.map(normalizeStatementInput);
}
// [{ statement: "statement" }]
if (input instanceof Array && typeof input[0] === 'object') {
return input;
}
// { statment: "statement" }
if (input.statement) {
return [{
statement : input.statement,
parameters : input.parameters,
}];
}
}
module.exports = TransactionStream;
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc