msgpack-rpc-lite
Advanced tools
Comparing version
@@ -23,4 +23,4 @@ "use strict"; | ||
const tr = (object) => object && object.toString().replace(/\s/g, ''); | ||
const equalsIgnoreSpace = (a, b) => tr(a) === tr(b); | ||
// tslint:disable-next-line:only-arrow-functions no-empty | ||
const equalsIgnoreSpace = (lhs, rhs) => tr(lhs) === tr(rhs); | ||
// tslint:disable-next-line:only-arrow-functions no-empty ban-types | ||
const isDoNothingFunction = (fn) => equalsIgnoreSpace(fn, function () { }); | ||
@@ -35,8 +35,13 @@ const enabled = !isDoNothingFunction(debug); | ||
const isFunction = (object) => typeof object === 'function'; | ||
const last = (array) => array[array.length - 1]; | ||
const popUnless = (array, predicate) => predicate(last(array)) ? array.pop() : undefined; | ||
function createMessage(type, method, parameters) { | ||
const params = parameters.slice(0); | ||
const callback = isFunction(params[params.length - 1]) && params.pop(); | ||
const callback = popUnless(params, isFunction); | ||
const message = [].concat(type, type === 0 ? msgidGenerator.next() : [], method, [params]); | ||
return [message, callback]; | ||
} | ||
function createResponseMessage(type, msgid, error, result) { | ||
return [type, msgid, util_1.default.isError(error) ? error.message : error, result]; | ||
} | ||
function parseMessage(message) { | ||
@@ -57,7 +62,6 @@ const msg = message.slice(0); | ||
} | ||
function writeMessage(socket, message, option, listener) { | ||
function writeMessage(socket, message, option) { | ||
const encodeStream = msgpack_lite_1.default.createEncodeStream(option); | ||
encodeStream.pipe(socket); | ||
encodeStream.end(message, listener); | ||
encodeStream.unpipe(socket); | ||
return new Promise(resolve => encodeStream.end(message, resolve)).then(() => void (encodeStream.unpipe(socket))); | ||
} | ||
@@ -93,9 +97,10 @@ /** | ||
if (callback) { | ||
send.call(this, message, callback, undefined); | ||
send(this, message, callback); | ||
} | ||
else { | ||
return new Promise((resolve, reject) => { | ||
const executor = (error, result, msgid) => error ? reject(error) : resolve([result, msgid]); | ||
send.call(this, message, executor, undefined); | ||
}); | ||
const executor = (resolve, reject) => { | ||
const listener = (error, ...response) => error ? reject(error) : resolve(response); | ||
send(this, message, listener); | ||
}; | ||
return new Promise(executor); | ||
} | ||
@@ -112,3 +117,3 @@ } | ||
call(method, ...args) { | ||
return this.request.apply(this, [method, ...args]); | ||
return this.request(method, ...args); | ||
} | ||
@@ -124,6 +129,6 @@ /** | ||
if (callback) { | ||
send.call(this, message, undefined, callback); | ||
send(this, message, undefined, callback); | ||
} | ||
else { | ||
return new Promise(resolve => send.call(this, message, undefined, () => resolve())); | ||
return new Promise(resolve => send(this, message, undefined, () => resolve())); | ||
} | ||
@@ -139,9 +144,9 @@ } | ||
// tslint:disable-next-line:no-empty max-line-length | ||
function send(message, responseListener = () => { }, writeFinishListener = () => { }) { | ||
const socket = net_1.default.createConnection(this.connectOptions, () => { | ||
writeMessage(socket, message, { codec: this.encodeCodec }, () => { | ||
function send(clinet, message, responseListener = () => { }, writeFinishListener = () => { }) { | ||
const socket = net_1.default.createConnection(clinet.connectOptions, () => { | ||
writeMessage(socket, message, { codec: clinet.encodeCodec }).then(() => { | ||
debug.enabled && debug(`sent message: ${util_1.default.inspect(message, false, null, true)}`); | ||
writeFinishListener(); | ||
socket.end(); | ||
}); | ||
socket.end(); | ||
}); | ||
@@ -151,7 +156,7 @@ const socketEvents = ['connect', 'end', 'timeout', 'drain', 'error', 'close']; | ||
debug(`socket event [${eventName}]`); | ||
this.emit.apply(this, [eventName].concat(args)); | ||
clinet.emit(eventName, ...args); | ||
}), socket); | ||
socket.pipe(msgpack_lite_1.default.createDecodeStream({ codec: this.decodeCodec })).on('data', response => { | ||
socket.pipe(msgpack_lite_1.default.createDecodeStream({ codec: clinet.decodeCodec })).on('data', response => { | ||
debug.enabled && debug(`received message from server: ${util_1.default.inspect(response, false, null, true)}`); | ||
const [type, msgid, error, result] = response; // Response message | ||
const { type, msgid, error = null, result } = parseMessage(response); // Response message | ||
debug.enabled && assert_1.default.deepEqual({ type, msgid }, { type: 1, msgid: parseMessage(message).msgid }); | ||
@@ -195,3 +200,3 @@ responseListener(error, result, msgid); | ||
const callback = type === 2 ? undefined : (error, result) => { | ||
const response = [1, msgid, error, result]; // Response message | ||
const response = createResponseMessage(1, msgid, error, result); // Response message | ||
writeMessage(socket, response, { codec: encodeCodec }); | ||
@@ -198,0 +203,0 @@ }; |
{ | ||
"name": "msgpack-rpc-lite", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "Implementation of MessagePack-RPC with msgpack-lite", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
@@ -24,5 +24,5 @@ /* | ||
const tr = (object: any) => object && object.toString().replace(/\s/g, ''); | ||
const equalsIgnoreSpace = (a: any, b: any): boolean => tr(a) === tr(b); | ||
// tslint:disable-next-line:only-arrow-functions no-empty | ||
const isDoNothingFunction = (fn: DebugLog) => equalsIgnoreSpace(fn, function () { }); | ||
const equalsIgnoreSpace = (lhs: any, rhs: any): boolean => tr(lhs) === tr(rhs); | ||
// tslint:disable-next-line:only-arrow-functions no-empty ban-types | ||
const isDoNothingFunction = (fn: Function) => equalsIgnoreSpace(fn, function () { }); | ||
const enabled = !isDoNothingFunction(debug); | ||
@@ -46,13 +46,18 @@ Object.defineProperty(debug, 'enabled', { get() { return enabled; } }); | ||
type Message = RequestMessage | ResponseMessage | NotifyMessage; | ||
type ResponseListener = (error: string, result: any, msgid: number) => void; | ||
type ResponseListener = (error: string | null, result: any, msgid: number) => void; | ||
type WriteFinishListener = () => void; | ||
type SendListener = ResponseListener | WriteFinishListener; | ||
const isFunction = (object: any) => typeof object === 'function'; | ||
const last = (array: any[]) => array[array.length - 1]; | ||
const popUnless = (array: any[], predicate: (object: any) => boolean) => predicate(last(array)) ? array.pop() : undefined; | ||
function createMessage(type: number, method: string, parameters: any[]): [Message, boolean | SendListener] { | ||
function createMessage(type: 0 | 2, method: string, parameters: any[]): [Message, SendListener] { | ||
const params = parameters.slice(0); | ||
const callback = isFunction(params[params.length - 1]) && (params.pop() as SendListener); | ||
const callback: SendListener = popUnless(params, isFunction); | ||
const message = (([] as any[]).concat(type, type === 0 ? msgidGenerator.next() : [], method, [params]) as Message); | ||
return [message, callback]; | ||
} | ||
function createResponseMessage(type: 1, msgid: number, error: string | Error, result: any): ResponseMessage { | ||
return [type, msgid, util.isError(error) ? error.message : error, result]; | ||
} | ||
@@ -75,7 +80,6 @@ function parseMessage(message: Message) { | ||
function writeMessage(socket: net.Socket, message: Message, option?: msgpack.EncoderOptions, listener?: () => void): void { | ||
function writeMessage(socket: net.Socket, message: Message, option?: msgpack.EncoderOptions) { | ||
const encodeStream = msgpack.createEncodeStream(option); | ||
encodeStream.pipe(socket); | ||
encodeStream.end(message, listener); | ||
encodeStream.unpipe(socket); | ||
return new Promise(resolve => encodeStream.end(message, resolve)).then(() => void (encodeStream.unpipe(socket))); | ||
} | ||
@@ -110,11 +114,12 @@ | ||
public request(method: string, ...parameters: any[]) { | ||
const [message, callback] = createMessage(0, method, parameters); | ||
const [message, callback] = (createMessage(0, method, parameters) as [Message, ResponseListener]); | ||
if (callback) { | ||
send.call(this, message, callback, undefined); | ||
send(this, message, callback); | ||
} else { | ||
return new Promise<[any, number]>((resolve, reject) => { | ||
const executor = (error: string, result: any, msgid: number) => | ||
error ? reject(error) : resolve([result, msgid]); | ||
send.call(this, message, executor, undefined); | ||
}); | ||
const executor = (resolve: (response: [any, number]) => void, reject: (error: string) => void) => { | ||
const listener = (error: string | null, ...response: any[]) => | ||
error ? reject(error) : resolve((response as [any, number])); | ||
send(this, message, listener); | ||
}; | ||
return new Promise<[any, number]>(executor); | ||
} | ||
@@ -132,3 +137,3 @@ } | ||
public call(method: string, ...args: any[]): Promise<[any, number]> | undefined { | ||
return this.request.apply(this, [method, ...args]); | ||
return this.request(method, ...args); | ||
} | ||
@@ -143,7 +148,7 @@ | ||
public notify(method: string, ...parameters: any[]) { | ||
const [message, callback] = createMessage(2, method, parameters); | ||
const [message, callback] = (createMessage(2, method, parameters) as [Message, WriteFinishListener]); | ||
if (callback) { | ||
send.call(this, message, undefined, callback); | ||
send(this, message, undefined, callback); | ||
} else { | ||
return new Promise(resolve => send.call(this, message, undefined, () => resolve())); | ||
return new Promise(resolve => send(this, message, undefined, () => resolve())); | ||
} | ||
@@ -160,9 +165,9 @@ } | ||
// tslint:disable-next-line:no-empty max-line-length | ||
function send(this: Client, message: Message, responseListener: ResponseListener = () => { }, writeFinishListener: WriteFinishListener = () => { }) { | ||
const socket = net.createConnection(this.connectOptions, () => { | ||
writeMessage(socket, message, { codec: this.encodeCodec }, () => { | ||
function send(clinet: Client, message: Message, responseListener: ResponseListener = () => { }, writeFinishListener: WriteFinishListener = () => { }) { | ||
const socket = net.createConnection(clinet.connectOptions, () => { | ||
writeMessage(socket, message, { codec: clinet.encodeCodec }).then(() => { | ||
debug.enabled && debug(`sent message: ${util.inspect(message, false, null, true)}`); | ||
writeFinishListener(); | ||
socket.end(); | ||
}); | ||
socket.end(); | ||
}); | ||
@@ -173,8 +178,8 @@ | ||
debug(`socket event [${eventName}]`); | ||
this.emit.apply(this, [eventName].concat(args)); | ||
clinet.emit(eventName, ...args); | ||
}), socket); | ||
socket.pipe(msgpack.createDecodeStream({ codec: this.decodeCodec })).on('data', response => { | ||
socket.pipe(msgpack.createDecodeStream({ codec: clinet.decodeCodec })).on('data', response => { | ||
debug.enabled && debug(`received message from server: ${util.inspect(response, false, null, true)}`); | ||
const [type, msgid, error, result] = (response as any); // Response message | ||
const { type, msgid, error = null, result } = parseMessage(response as any); // Response message | ||
debug.enabled && assert.deepEqual({ type, msgid }, { type: 1, msgid: parseMessage(message).msgid }); | ||
@@ -220,3 +225,3 @@ responseListener(error, result, msgid); | ||
const callback = type === 2 ? undefined : (error: string, result: any) => { | ||
const response: Message = [1, msgid, error, result]; // Response message | ||
const response: Message = createResponseMessage(1, msgid, error, result); // Response message | ||
writeMessage(socket, response, { codec: encodeCodec }); | ||
@@ -223,0 +228,0 @@ }; |
@@ -76,2 +76,19 @@ import { expect } from 'chai'; | ||
it('should be error', done => { | ||
portfinder.getPortPromise(options).then(port => { | ||
debug({ port }); | ||
server = rpc.createServer().on('foo', (params, callback) => { | ||
callback(new Error('error')); | ||
}); | ||
server.listen(port); | ||
const client = new rpc.Client({ port }); | ||
return client.call('foo', 1, 2, 3); | ||
}).then(done).catch(error => { | ||
expect(error).to.equal('error'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
@@ -78,0 +95,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
177686
280.67%18
12.5%5195
603.93%2
100%