@pgtyped/wire
Advanced tools
Comparing version 2.0.0 to 2.0.1
@@ -18,2 +18,6 @@ /// <reference types="node" /> | ||
} | ||
interface IIncompleteMessageError { | ||
type: 'IncompleteMessageError'; | ||
messageName: string; | ||
} | ||
interface IServerError { | ||
@@ -25,7 +29,6 @@ type: 'ServerError'; | ||
} | ||
export type ParseResult<Params> = IMessagePayload<Params> | IMessageMismatchError | IServerError; | ||
export type ParseResult<Params> = IMessagePayload<Params> | IMessageMismatchError | IServerError | IIncompleteMessageError; | ||
export declare const parseMessage: <Params extends object>(message: IServerMessage<Params>, buf: Buffer, messageOffset?: number) => ParseResult<Params>; | ||
export declare const buildMessage: <Params extends object>(message: IClientMessage<Params>, parameters: Params) => Buffer; | ||
export declare const parseOneOf: (messages: Array<IServerMessage<any>>, buffer: Buffer, offset: number) => ParseResult<object>; | ||
export declare const parseMultiple: (messages: Array<IServerMessage<any>>, buffer: Buffer, offset: number) => ParseResult<object>[]; | ||
export {}; |
@@ -60,2 +60,8 @@ import { byte1, byte4, byteN, cByteDict, cString, cStringUnknownLengthArray, int16, int32, sumSize, notNullTerminatedString, } from './helpers.js'; | ||
const messageEnd = messageSize + messageOffset + 1; | ||
if (messageEnd > buf.length) { | ||
return { | ||
type: 'IncompleteMessageError', | ||
messageName: message.name, | ||
}; | ||
} | ||
if (indicator !== expectedIndicator && !isUnexpectedErrorMessage) { | ||
@@ -181,18 +187,2 @@ return { | ||
}; | ||
export const parseMultiple = (messages, buffer, offset) => { | ||
const result = []; | ||
const bufferEnd = buffer.byteLength; | ||
let lastBufferOffset = offset; | ||
while (lastBufferOffset < bufferEnd) { | ||
const parseResult = parseOneOf(messages, buffer, lastBufferOffset); | ||
if (parseResult.type !== 'MessageMismatchError') { | ||
result.push(parseResult); | ||
lastBufferOffset = parseResult.bufferOffset; | ||
} | ||
else { | ||
return [parseResult]; | ||
} | ||
} | ||
return result; | ||
}; | ||
//# sourceMappingURL=protocol.js.map |
@@ -13,4 +13,4 @@ /// <reference types="node" /> | ||
export declare class AsyncQueue { | ||
queue: Buffer[]; | ||
bufferOffset: number; | ||
buffer: Buffer; | ||
socket: net.Socket; | ||
@@ -20,3 +20,3 @@ replyPending: { | ||
reject: (data: any) => any; | ||
parser: (buf: Buffer, offset: number) => ParseResult<object> | ParseResult<object>[]; | ||
parser: (buf: Buffer, offset: number) => ParseResult<object>; | ||
} | null; | ||
@@ -37,10 +37,3 @@ constructor(); | ||
reply<Messages extends Array<IServerMessage<any>>>(...serverMessages: Messages): Promise<Boxified<Messages>[number]>; | ||
/** | ||
* Waits for the next buffer consisting of multiple messages to arrive and parses it, resolving with the parsed | ||
* values. | ||
* @param serverMessages The array of messages to match | ||
* @returns The parsed params | ||
*/ | ||
multiMessageReply<Messages extends Array<IServerMessage<any>>>(...serverMessages: Messages): Promise<Record<string, any>>; | ||
} | ||
export {}; |
@@ -23,3 +23,3 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
import * as tls from 'tls'; | ||
import { buildMessage, parseMessage, parseMultiple, parseOneOf, } from './protocol.js'; | ||
import { buildMessage, parseMessage, parseOneOf, } from './protocol.js'; | ||
import { messages } from './messages.js'; | ||
@@ -30,4 +30,4 @@ import debugBase from 'debug'; | ||
constructor() { | ||
this.queue = []; | ||
this.bufferOffset = 0; | ||
this.buffer = Buffer.alloc(0); | ||
this.replyPending = null; | ||
@@ -42,3 +42,3 @@ this.socket = new net.Socket({}); | ||
debug('received %o bytes', buffer.length); | ||
this.queue.push(buffer); | ||
this.buffer = Buffer.concat([this.buffer, buffer]); | ||
this.processQueue(); | ||
@@ -105,50 +105,29 @@ }); | ||
processQueue() { | ||
if (!this.replyPending || this.queue.length === 0) { | ||
if (!this.replyPending || this.buffer.length === 0) { | ||
return; | ||
} | ||
const buf = this.queue[0]; | ||
const parsed = this.replyPending.parser(buf, this.bufferOffset); | ||
if (Array.isArray(parsed)) { | ||
// Move queue cursor in any case | ||
const lastBufferOffset = parsed[parsed.length - 1].bufferOffset; | ||
if (lastBufferOffset >= buf.length) { | ||
this.bufferOffset = 0; | ||
this.queue.pop(); | ||
} | ||
else { | ||
this.bufferOffset = lastBufferOffset; | ||
} | ||
const res = parsed.reduce((acc, result) => (Object.assign(Object.assign({}, acc), (result.type !== 'ServerError' && | ||
result.type !== 'MessageMismatchError' | ||
? { [result.messageName]: result.data } | ||
: {}))), {}); | ||
if (!Object.keys(res).length) { | ||
this.replyPending.reject(parsed); | ||
} | ||
else { | ||
debug('resolved awaited %o message', res); | ||
this.replyPending.resolve(res); | ||
} | ||
const parsed = this.replyPending.parser(this.buffer, this.bufferOffset); | ||
if (parsed.type === 'IncompleteMessageError') { | ||
debug('received incomplete message'); | ||
return; | ||
} | ||
// Move queue cursor in any case | ||
if (parsed.bufferOffset === this.buffer.length) { | ||
this.bufferOffset = 0; | ||
this.buffer = Buffer.alloc(0); | ||
} | ||
else { | ||
// Move queue cursor in any case | ||
if (parsed.bufferOffset >= buf.length) { | ||
this.bufferOffset = 0; | ||
this.queue.pop(); | ||
} | ||
else { | ||
this.bufferOffset = parsed.bufferOffset; | ||
} | ||
if (parsed.type === 'ServerError') { | ||
this.replyPending.reject(parsed); | ||
} | ||
else if (parsed.type === 'MessagePayload') { | ||
debug('resolved awaited %o message', parsed.messageName); | ||
this.replyPending.resolve(parsed.data); | ||
} | ||
else { | ||
debug('received ignored message'); | ||
this.processQueue(); | ||
} | ||
this.bufferOffset = parsed.bufferOffset; | ||
} | ||
if (parsed.type === 'ServerError') { | ||
this.replyPending.reject(parsed); | ||
} | ||
else if (parsed.type === 'MessagePayload') { | ||
debug('resolved awaited %o message', parsed.messageName); | ||
this.replyPending.resolve(parsed.data); | ||
} | ||
else { | ||
debug('received ignored message'); | ||
this.processQueue(); | ||
} | ||
} | ||
@@ -179,21 +158,3 @@ /** | ||
} | ||
/** | ||
* Waits for the next buffer consisting of multiple messages to arrive and parses it, resolving with the parsed | ||
* values. | ||
* @param serverMessages The array of messages to match | ||
* @returns The parsed params | ||
*/ | ||
multiMessageReply(...serverMessages) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
return new Promise((resolve, reject) => { | ||
this.replyPending = { | ||
resolve, | ||
reject, | ||
parser: (buf, offset) => parseMultiple(serverMessages, buf, offset), | ||
}; | ||
this.processQueue(); | ||
}); | ||
}); | ||
} | ||
} | ||
//# sourceMappingURL=queue.js.map |
{ | ||
"name": "@pgtyped/wire", | ||
"version": "2.0.0", | ||
"version": "2.0.1", | ||
"type": "module", | ||
@@ -38,3 +38,3 @@ "exports": { | ||
}, | ||
"gitHead": "618ba86befcfe2e474afd46e4321ca673141aa1a" | ||
"gitHead": "fdcff0ffd5d6067356aac4661dcf349c47090197" | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
63878
1060