Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

jubatus

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

jubatus - npm Package Compare versions

Comparing version 0.6.4 to 0.7.0

.vscode/launch.json

31

index.js

@@ -12,6 +12,2 @@ /*jslint node: true */

function toArray(args) {
return Array.prototype.slice.call(args);
}
function camelCase(input) {

@@ -76,30 +72,15 @@ return input.toLowerCase().replace(/_(.)/g, function(match, group1) {

.reduce((constructor, [ rpcName, methodName, assertParams, assertReturn ]) => {
constructor.prototype[methodName] = function () {
constructor.prototype[methodName] = function (...params) {
const self = this,
client = self.getClient(),
params = toArray(arguments),
hasCallback = (typeof params[params.length - 1] === 'function');
callback = (typeof params[params.length - 1] === 'function') && params.pop();
assertParams(params);
params.unshift(self.getName());
if (hasCallback) {
const callback = params.pop();
if (callback) {
client.call(rpcName, params, (error, result, msgid) => {
if (error) {
callback.call(self, new Error(`${ error } ${ result || '' }`), null, msgid);
} else {
assertReturn(result);
callback.call(self, null, result, msgid);
}
if (!error) { assertReturn(result); }
callback.call(self, error, result, msgid);
});
} else {
return new Promise((resolve, reject) => {
client.call(rpcName, params, (error, result, msgid) => {
if (error) {
reject(new Error(`${ error } ${ result || '' }`));
} else {
assertReturn(result);
resolve([ result, msgid ]);
}
});
});
return client.call(rpcName, params);
}

@@ -106,0 +87,0 @@ };

const util = require('util');
module.exports = function (label = '') {
const { env: { DEBUG } } = (global.process || { env: {} });
const enabled = (DEBUG || '').indexOf(label) > -1;
const debug = util.debuglog(label);
const enabled = debug.toString() !== (function () {}).toString();
Object.defineProperty(debug, 'enabled', { get() { return enabled; } });
return debug;
};

@@ -1,7 +0,17 @@

const msgpack = require('msgpack-js'),
Stream = require('msgpack').Stream,
/*
* MessagePack-RPC Implementation
* ==============================
*
* ## MessagePack-RPC Specification ##
*
* See also:
* - https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md
* - http://frsyuki.hatenablog.com/entry/20100406/p1
*/
const msgpack = require('msgpack-lite'),
debug = require('./debug')('jubatus-node-client:lib:msgpack-rpc'),
assert = require('assert'),
events = require('events'),
net = require('net'),
events = require('events'),
util = require('util');

@@ -12,78 +22,87 @@

let msgid = 0;
return {
next() {
return (msgid = (msgid < MAX ? msgid + 1 : 0));
}
};
return { next() { return (msgid = (msgid < MAX ? msgid + 1 : 0)); } };
}());
function Client(socket) {
assert(socket instanceof net.Socket, 'Illegal argument');
function createEncodeStream(options = { codec: msgpack.createCodec({ useraw: true }) }) {
return msgpack.createEncodeStream(options);
}
function Client(port, host, timeout) {
events.EventEmitter.call(this);
let callbacks = {},
port = socket.remotePort,
host = socket.remoteAddress;
const self = this,
receive = function receive(response) {
if (debug.enabled) { debug(`received message: ${ util.inspect(response, false, null, true) }`); }
const self = this;
const socketEvents = [ 'connect', 'end', 'timeout', 'drain', 'error', 'close' ];
const [ type, msgid, error, result ] = response,
callback = (callbacks[msgid] || function () {});
callback.call(self, error, result, msgid);
delete callbacks[msgid];
},
stream = new Stream(socket).on('msg', receive),
send = function send(request) {
const buf = msgpack.encode(request);
return socket.write(buf, function () {
if (debug.enabled) { debug(`sent message: ${ util.inspect(request, false, null, true) }`); }
function send(message, callback = function () {}) {
const socket = net.connect(port, host);
socket.setTimeout(timeout);
debug({ socket });
socketEvents.forEach(eventName => {
socket.on(eventName, (...args) => {
debug(`socket event [${ eventName }]`);
self.emit.apply(self, [eventName].concat(args));
});
},
ready = function ready() {
if (self.closed) { throw new Error('closed'); }
if (socket.destroyed) { socket.connect(port, host); }
},
socketEvents = [ 'connect', 'end', 'timeout', 'drain', 'error', 'close' ];
});
socketEvents.forEach(function (eventName) {
socket.on(eventName, function () {
debug(`socket event [${ eventName }]`);
const args = [eventName].concat(Array.prototype.slice.call(arguments));
self.emit.apply(self, args);
if (message[0] === 0) {
socket.pipe(msgpack.createDecodeStream()).on('data', message => {
if (debug.enabled) { debug(`received message: ${ util.inspect(message, false, null, true) }`); }
socket.end();
const [ type, msgid, error, result ] = message; // Response message
assert.equal(type, 1);
assert.equal(msgid, message[1]);
callback.call(self, error, result, msgid);
});
}
const encodeStream = createEncodeStream();
encodeStream.pipe(socket);
encodeStream.write(message, (...args) => {
if (debug.enabled) { debug(`sent message: ${ util.inspect(message, false, null, true) }`); }
if (message[0] === 2) { callback.apply(self, args); }
});
encodeStream.end();
};
Object.defineProperty(this, 'send', {
get() { return send; },
enumerable: false
});
socket.once('connect', function onConnect() {
host = this.remoteAddress;
port = this.remotePort;
// It is left for compatibility with v0.6 or earlier.
Object.defineProperty(this, 'close', {
get() { return (() => {}); }
});
debug({ socket });
}
this.closed = socket.destroyed;
this.close = function close() {
socket.end();
this.closed = true;
};
this.call = function call(method, params, callback) {
ready();
const msgid = msgidGenerator.next(),
request = [0, msgid, method, [].concat(params)];
callbacks[msgid] = callback;
send(request);
};
this.notify = function notify(method, params) {
ready();
send([2, method, [].concat(params)]);
};
function _call(type, method, params, callback) {
const message = [ type ].concat(type === 0 ? msgidGenerator.next() : [], [ method, [].concat(params) ] );
if (callback) {
this.send(message, callback);
} else {
return new Promise((resolve, reject) => {
this.send(message, (error, ...args) => {
if (error) { reject(error); } else { resolve(args); }
});
});
}
}
function request(method, params, callback) {
return _call.call(this, 0, method, params, callback);
}
function notify(method, params, callback) {
return _call.call(this, 2, method, params, callback);
}
Client.prototype.request = request;
Client.prototype.call = request; // It is left for compatibility with v0.6 or earlier.
Client.prototype.notify = notify;
util.inherits(Client, events.EventEmitter);
exports.Client = Client;
exports.createClient = function createClient(port = 9190, host = 'localhost', timeout = 0) {
exports.createClient = function createClient(port = 9199, host = 'localhost', timeout = 0) {
debug({ port, host, timeout });
const socket = net.connect(port, host);
socket.setTimeout(timeout);
return new Client(socket);
return new Client(port, host, timeout);
};

@@ -94,13 +113,14 @@

server.on('connection', function onConnection(socket) {
const stream = new Stream(socket).on('msg', function onMsg(request) {
debug(request);
if (request[0] === 0) {
const [ type, msgid, method, params ] = request;
const callback = (error, result) => {
const response = [ 1, msgid, error, [].concat(result) ];
socket.write(msgpack.encode(response));
};
server.emit(method, params, callback);
socket.pipe(msgpack.createDecodeStream()).on('data', message => {
debug(message);
if (message[0] === 0) {
const [ type, msgid, method, params ] = message; // Request message
server.emit(method, params, (error, result) => {
const encodeStream = createEncodeStream();
encodeStream.pipe(socket);
encodeStream.write([ 1, msgid, error, [].concat(result) ]); // Response message
encodeStream.end();
});
} else {
const [ type, method, params ] = request;
const [ type, method, params ] = message; // Notification message
server.emit(method, params);

@@ -107,0 +127,0 @@ }

{
"name": "jubatus",
"version": "0.6.4",
"version": "0.7.0",
"homepage": "https://github.com/naokikimura/jubatus-node-client",

@@ -22,4 +22,3 @@ "main": "./index.js",

"json-schema": "^0.2.3",
"msgpack": "^1.0.2",
"msgpack-js": "^0.3.0"
"msgpack-lite": "^0.1.26"
},

@@ -26,0 +25,0 @@ "readme": "README.md",

@@ -31,6 +31,6 @@ const expect = require('chai').expect;

client = rpc.createClient(port);
client.call('foo', [ 1, 2, 3], (error, response) => {
return client.request('foo', [ 1, 2, 3]);
}).then(([ response ]) => {
expect(response).to.have.ordered.members([ 'bar' ]);
done();
});
}).catch(done);

@@ -37,0 +37,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc