msgpack-rpc-lite
Advanced tools
Comparing version 0.2.0 to 0.2.1
@@ -23,4 +23,4 @@ "use strict"; | ||
const tr = (object) => object && object.toString().replace(/\s/g, ''); | ||
const equalsIgnoreSpace = (a, b) => tr(a) === tr(b); | ||
// tslint:disable-next-line:only-arrow-functions no-empty | ||
const equalsIgnoreSpace = (lhs, rhs) => tr(lhs) === tr(rhs); | ||
// tslint:disable-next-line:only-arrow-functions no-empty ban-types | ||
const isDoNothingFunction = (fn) => equalsIgnoreSpace(fn, function () { }); | ||
@@ -35,8 +35,13 @@ const enabled = !isDoNothingFunction(debug); | ||
const isFunction = (object) => typeof object === 'function'; | ||
const last = (array) => array[array.length - 1]; | ||
const popUnless = (array, predicate) => predicate(last(array)) ? array.pop() : undefined; | ||
function createMessage(type, method, parameters) { | ||
const params = parameters.slice(0); | ||
const callback = isFunction(params[params.length - 1]) && params.pop(); | ||
const callback = popUnless(params, isFunction); | ||
const message = [].concat(type, type === 0 ? msgidGenerator.next() : [], method, [params]); | ||
return [message, callback]; | ||
} | ||
function createResponseMessage(type, msgid, error, result) { | ||
return [type, msgid, util_1.default.isError(error) ? error.message : error, result]; | ||
} | ||
function parseMessage(message) { | ||
@@ -57,7 +62,6 @@ const msg = message.slice(0); | ||
} | ||
function writeMessage(socket, message, option, listener) { | ||
function writeMessage(socket, message, option) { | ||
const encodeStream = msgpack_lite_1.default.createEncodeStream(option); | ||
encodeStream.pipe(socket); | ||
encodeStream.end(message, listener); | ||
encodeStream.unpipe(socket); | ||
return new Promise(resolve => encodeStream.end(message, resolve)).then(() => void (encodeStream.unpipe(socket))); | ||
} | ||
@@ -93,9 +97,10 @@ /** | ||
if (callback) { | ||
send.call(this, message, callback, undefined); | ||
send(this, message, callback); | ||
} | ||
else { | ||
return new Promise((resolve, reject) => { | ||
const executor = (error, result, msgid) => error ? reject(error) : resolve([result, msgid]); | ||
send.call(this, message, executor, undefined); | ||
}); | ||
const executor = (resolve, reject) => { | ||
const listener = (error, ...response) => error ? reject(error) : resolve(response); | ||
send(this, message, listener); | ||
}; | ||
return new Promise(executor); | ||
} | ||
@@ -112,3 +117,3 @@ } | ||
call(method, ...args) { | ||
return this.request.apply(this, [method, ...args]); | ||
return this.request(method, ...args); | ||
} | ||
@@ -124,6 +129,6 @@ /** | ||
if (callback) { | ||
send.call(this, message, undefined, callback); | ||
send(this, message, undefined, callback); | ||
} | ||
else { | ||
return new Promise(resolve => send.call(this, message, undefined, () => resolve())); | ||
return new Promise(resolve => send(this, message, undefined, () => resolve())); | ||
} | ||
@@ -139,9 +144,9 @@ } | ||
// tslint:disable-next-line:no-empty max-line-length | ||
function send(message, responseListener = () => { }, writeFinishListener = () => { }) { | ||
const socket = net_1.default.createConnection(this.connectOptions, () => { | ||
writeMessage(socket, message, { codec: this.encodeCodec }, () => { | ||
function send(clinet, message, responseListener = () => { }, writeFinishListener = () => { }) { | ||
const socket = net_1.default.createConnection(clinet.connectOptions, () => { | ||
writeMessage(socket, message, { codec: clinet.encodeCodec }).then(() => { | ||
debug.enabled && debug(`sent message: ${util_1.default.inspect(message, false, null, true)}`); | ||
writeFinishListener(); | ||
socket.end(); | ||
}); | ||
socket.end(); | ||
}); | ||
@@ -151,7 +156,7 @@ const socketEvents = ['connect', 'end', 'timeout', 'drain', 'error', 'close']; | ||
debug(`socket event [${eventName}]`); | ||
this.emit.apply(this, [eventName].concat(args)); | ||
clinet.emit(eventName, ...args); | ||
}), socket); | ||
socket.pipe(msgpack_lite_1.default.createDecodeStream({ codec: this.decodeCodec })).on('data', response => { | ||
socket.pipe(msgpack_lite_1.default.createDecodeStream({ codec: clinet.decodeCodec })).on('data', response => { | ||
debug.enabled && debug(`received message from server: ${util_1.default.inspect(response, false, null, true)}`); | ||
const [type, msgid, error, result] = response; // Response message | ||
const { type, msgid, error = null, result } = parseMessage(response); // Response message | ||
debug.enabled && assert_1.default.deepEqual({ type, msgid }, { type: 1, msgid: parseMessage(message).msgid }); | ||
@@ -195,3 +200,3 @@ responseListener(error, result, msgid); | ||
const callback = type === 2 ? undefined : (error, result) => { | ||
const response = [1, msgid, error, result]; // Response message | ||
const response = createResponseMessage(1, msgid, error, result); // Response message | ||
writeMessage(socket, response, { codec: encodeCodec }); | ||
@@ -198,0 +203,0 @@ }; |
{ | ||
"name": "msgpack-rpc-lite", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "Implementation of MessagePack-RPC with msgpack-lite", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
@@ -24,5 +24,5 @@ /* | ||
const tr = (object: any) => object && object.toString().replace(/\s/g, ''); | ||
const equalsIgnoreSpace = (a: any, b: any): boolean => tr(a) === tr(b); | ||
// tslint:disable-next-line:only-arrow-functions no-empty | ||
const isDoNothingFunction = (fn: DebugLog) => equalsIgnoreSpace(fn, function () { }); | ||
const equalsIgnoreSpace = (lhs: any, rhs: any): boolean => tr(lhs) === tr(rhs); | ||
// tslint:disable-next-line:only-arrow-functions no-empty ban-types | ||
const isDoNothingFunction = (fn: Function) => equalsIgnoreSpace(fn, function () { }); | ||
const enabled = !isDoNothingFunction(debug); | ||
@@ -46,13 +46,18 @@ Object.defineProperty(debug, 'enabled', { get() { return enabled; } }); | ||
type Message = RequestMessage | ResponseMessage | NotifyMessage; | ||
type ResponseListener = (error: string, result: any, msgid: number) => void; | ||
type ResponseListener = (error: string | null, result: any, msgid: number) => void; | ||
type WriteFinishListener = () => void; | ||
type SendListener = ResponseListener | WriteFinishListener; | ||
const isFunction = (object: any) => typeof object === 'function'; | ||
const last = (array: any[]) => array[array.length - 1]; | ||
const popUnless = (array: any[], predicate: (object: any) => boolean) => predicate(last(array)) ? array.pop() : undefined; | ||
function createMessage(type: number, method: string, parameters: any[]): [Message, boolean | SendListener] { | ||
function createMessage(type: 0 | 2, method: string, parameters: any[]): [Message, SendListener] { | ||
const params = parameters.slice(0); | ||
const callback = isFunction(params[params.length - 1]) && (params.pop() as SendListener); | ||
const callback: SendListener = popUnless(params, isFunction); | ||
const message = (([] as any[]).concat(type, type === 0 ? msgidGenerator.next() : [], method, [params]) as Message); | ||
return [message, callback]; | ||
} | ||
function createResponseMessage(type: 1, msgid: number, error: string | Error, result: any): ResponseMessage { | ||
return [type, msgid, util.isError(error) ? error.message : error, result]; | ||
} | ||
@@ -75,7 +80,6 @@ function parseMessage(message: Message) { | ||
function writeMessage(socket: net.Socket, message: Message, option?: msgpack.EncoderOptions, listener?: () => void): void { | ||
function writeMessage(socket: net.Socket, message: Message, option?: msgpack.EncoderOptions) { | ||
const encodeStream = msgpack.createEncodeStream(option); | ||
encodeStream.pipe(socket); | ||
encodeStream.end(message, listener); | ||
encodeStream.unpipe(socket); | ||
return new Promise(resolve => encodeStream.end(message, resolve)).then(() => void (encodeStream.unpipe(socket))); | ||
} | ||
@@ -110,11 +114,12 @@ | ||
public request(method: string, ...parameters: any[]) { | ||
const [message, callback] = createMessage(0, method, parameters); | ||
const [message, callback] = (createMessage(0, method, parameters) as [Message, ResponseListener]); | ||
if (callback) { | ||
send.call(this, message, callback, undefined); | ||
send(this, message, callback); | ||
} else { | ||
return new Promise<[any, number]>((resolve, reject) => { | ||
const executor = (error: string, result: any, msgid: number) => | ||
error ? reject(error) : resolve([result, msgid]); | ||
send.call(this, message, executor, undefined); | ||
}); | ||
const executor = (resolve: (response: [any, number]) => void, reject: (error: string) => void) => { | ||
const listener = (error: string | null, ...response: any[]) => | ||
error ? reject(error) : resolve((response as [any, number])); | ||
send(this, message, listener); | ||
}; | ||
return new Promise<[any, number]>(executor); | ||
} | ||
@@ -132,3 +137,3 @@ } | ||
public call(method: string, ...args: any[]): Promise<[any, number]> | undefined { | ||
return this.request.apply(this, [method, ...args]); | ||
return this.request(method, ...args); | ||
} | ||
@@ -143,7 +148,7 @@ | ||
public notify(method: string, ...parameters: any[]) { | ||
const [message, callback] = createMessage(2, method, parameters); | ||
const [message, callback] = (createMessage(2, method, parameters) as [Message, WriteFinishListener]); | ||
if (callback) { | ||
send.call(this, message, undefined, callback); | ||
send(this, message, undefined, callback); | ||
} else { | ||
return new Promise(resolve => send.call(this, message, undefined, () => resolve())); | ||
return new Promise(resolve => send(this, message, undefined, () => resolve())); | ||
} | ||
@@ -160,9 +165,9 @@ } | ||
// tslint:disable-next-line:no-empty max-line-length | ||
function send(this: Client, message: Message, responseListener: ResponseListener = () => { }, writeFinishListener: WriteFinishListener = () => { }) { | ||
const socket = net.createConnection(this.connectOptions, () => { | ||
writeMessage(socket, message, { codec: this.encodeCodec }, () => { | ||
function send(clinet: Client, message: Message, responseListener: ResponseListener = () => { }, writeFinishListener: WriteFinishListener = () => { }) { | ||
const socket = net.createConnection(clinet.connectOptions, () => { | ||
writeMessage(socket, message, { codec: clinet.encodeCodec }).then(() => { | ||
debug.enabled && debug(`sent message: ${util.inspect(message, false, null, true)}`); | ||
writeFinishListener(); | ||
socket.end(); | ||
}); | ||
socket.end(); | ||
}); | ||
@@ -173,8 +178,8 @@ | ||
debug(`socket event [${eventName}]`); | ||
this.emit.apply(this, [eventName].concat(args)); | ||
clinet.emit(eventName, ...args); | ||
}), socket); | ||
socket.pipe(msgpack.createDecodeStream({ codec: this.decodeCodec })).on('data', response => { | ||
socket.pipe(msgpack.createDecodeStream({ codec: clinet.decodeCodec })).on('data', response => { | ||
debug.enabled && debug(`received message from server: ${util.inspect(response, false, null, true)}`); | ||
const [type, msgid, error, result] = (response as any); // Response message | ||
const { type, msgid, error = null, result } = parseMessage(response as any); // Response message | ||
debug.enabled && assert.deepEqual({ type, msgid }, { type: 1, msgid: parseMessage(message).msgid }); | ||
@@ -220,3 +225,3 @@ responseListener(error, result, msgid); | ||
const callback = type === 2 ? undefined : (error: string, result: any) => { | ||
const response: Message = [1, msgid, error, result]; // Response message | ||
const response: Message = createResponseMessage(1, msgid, error, result); // Response message | ||
writeMessage(socket, response, { codec: encodeCodec }); | ||
@@ -223,0 +228,0 @@ }; |
@@ -76,2 +76,19 @@ import { expect } from 'chai'; | ||
it('should be error', done => { | ||
portfinder.getPortPromise(options).then(port => { | ||
debug({ port }); | ||
server = rpc.createServer().on('foo', (params, callback) => { | ||
callback(new Error('error')); | ||
}); | ||
server.listen(port); | ||
const client = new rpc.Client({ port }); | ||
return client.call('foo', 1, 2, 3); | ||
}).then(done).catch(error => { | ||
expect(error).to.equal('error'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
@@ -78,0 +95,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
177686
18
5195
2