fluentd-node
Advanced tools
Comparing version 0.0.3 to 0.0.4
import { FluentSocket, FluentSocketOptions } from "./socket"; | ||
import * as protocol from "./protocol"; | ||
/** | ||
* The authentication options for the client | ||
*/ | ||
export declare type FluentAuthOptions = { | ||
/** | ||
* The client host name (required). | ||
* | ||
* Must be unique to this process | ||
*/ | ||
clientHostname: string; | ||
/** | ||
* The shared key with the server. (required) | ||
*/ | ||
sharedKey: string; | ||
/** | ||
* The username to authenticate with. (optional) | ||
*/ | ||
username?: string; | ||
/** | ||
* The password to authenticate with. (optional) | ||
*/ | ||
password?: string; | ||
}; | ||
/** | ||
* An implementation of FluentSocket which authenticates the socket using the [Forward protocol Handshake](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#handshake-messages) | ||
*/ | ||
export declare class FluentAuthSocket extends FluentSocket { | ||
@@ -16,8 +36,41 @@ private authState; | ||
private password; | ||
/** | ||
* Creates a new instance of the socket | ||
* @param authOptions The authentication options to use | ||
* @param socketOptions The socket options to pass to the underlying socket | ||
*/ | ||
constructor(authOptions: FluentAuthOptions, socketOptions?: FluentSocketOptions); | ||
/** | ||
* Once the socket is connected, we expect a HELO | ||
*/ | ||
protected onConnected(): void; | ||
/** | ||
* When the socket is closed, we're unauthenticated | ||
*/ | ||
protected onClose(): void; | ||
/** | ||
* Handles messages from the server | ||
* | ||
* If we're waiting for a message, this will trigger it, otherwise just forward to the superclass. | ||
* | ||
* @param message The message to check | ||
*/ | ||
protected onMessage(message: protocol.ServerMessage): void; | ||
/** | ||
* Called on a HELO message | ||
* | ||
* Should parse the message, and send back a PING | ||
* | ||
* @param message The HELO message | ||
*/ | ||
private handleHelo; | ||
/** | ||
* Called on a PONG message | ||
* | ||
* Should parse and validate the message, and if valid, establish the connection | ||
* | ||
* @param message The PONG message | ||
* @returns void | ||
*/ | ||
private handlePong; | ||
} |
@@ -10,8 +10,28 @@ "use strict"; | ||
(function (FluentAuthState) { | ||
/** | ||
* The client is not authenticated (socket is not connected) | ||
*/ | ||
FluentAuthState[FluentAuthState["UNAUTHENTICATED"] = 0] = "UNAUTHENTICATED"; | ||
/** | ||
* The client is waiting for a HELO from the server | ||
*/ | ||
FluentAuthState[FluentAuthState["HELO"] = 1] = "HELO"; | ||
/** | ||
* The client is waiting for a PONG from the server | ||
*/ | ||
FluentAuthState[FluentAuthState["PONG"] = 2] = "PONG"; | ||
/** | ||
* The client is fully authenticated | ||
*/ | ||
FluentAuthState[FluentAuthState["AUTHENTICATED"] = 3] = "AUTHENTICATED"; | ||
})(FluentAuthState || (FluentAuthState = {})); | ||
/** | ||
* An implementation of FluentSocket which authenticates the socket using the [Forward protocol Handshake](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#handshake-messages) | ||
*/ | ||
class FluentAuthSocket extends socket_1.FluentSocket { | ||
/** | ||
* Creates a new instance of the socket | ||
* @param authOptions The authentication options to use | ||
* @param socketOptions The socket options to pass to the underlying socket | ||
*/ | ||
constructor(authOptions, socketOptions = {}) { | ||
@@ -29,5 +49,11 @@ super(socketOptions); | ||
} | ||
/** | ||
* Once the socket is connected, we expect a HELO | ||
*/ | ||
onConnected() { | ||
this.authState = FluentAuthState.HELO; | ||
} | ||
/** | ||
* When the socket is closed, we're unauthenticated | ||
*/ | ||
onClose() { | ||
@@ -37,2 +63,9 @@ this.authState = FluentAuthState.UNAUTHENTICATED; | ||
} | ||
/** | ||
* Handles messages from the server | ||
* | ||
* If we're waiting for a message, this will trigger it, otherwise just forward to the superclass. | ||
* | ||
* @param message The message to check | ||
*/ | ||
onMessage(message) { | ||
@@ -50,5 +83,12 @@ if (protocol.isHelo(message) && this.authState === FluentAuthState.HELO) { | ||
else { | ||
this.close(socket_1.CloseState.CLOSE, new error_1.ResponseError("Received unexpected message")); | ||
this.close(socket_1.CloseState.CLOSE, new error_1.UnexpectedMessageError("Received unexpected message")); | ||
} | ||
} | ||
/** | ||
* Called on a HELO message | ||
* | ||
* Should parse the message, and send back a PING | ||
* | ||
* @param message The HELO message | ||
*/ | ||
handleHelo(message) { | ||
@@ -69,2 +109,10 @@ const heloOptions = protocol.parseHelo(message); | ||
} | ||
/** | ||
* Called on a PONG message | ||
* | ||
* Should parse and validate the message, and if valid, establish the connection | ||
* | ||
* @param message The PONG message | ||
* @returns void | ||
*/ | ||
handlePong(message) { | ||
@@ -71,0 +119,0 @@ try { |
@@ -6,21 +6,110 @@ import EventTime from "./event_time"; | ||
import { EventRetryOptions } from "./event_retrier"; | ||
declare type EventModes = "Message" | "Forward" | "PackedForward" | "CompressedPackedForward"; | ||
declare type Timestamp = number | Date | EventTime; | ||
declare type AckOptions = { | ||
/** | ||
* The set of accepted event modes. See [Forward protocol spec](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#event-modes) | ||
* | ||
* `Message` will send each event to FluentD individually. | ||
* | ||
* `Forward` will collect the events together by tag, and send them together to FluentD in a single packet. | ||
* This is more efficient with a `flushInterval` to batch these together. | ||
* | ||
* `PackedForward` will behave the same as `Forward`, but will pack the events as part of entering the queue. This saves memory and bandwidth. | ||
* | ||
* `CompressedPackedForward` will behave the same as `PackedForward`, but additionally compress the items before emission, saving more bandwidth. | ||
*/ | ||
export declare type EventModes = "Message" | "Forward" | "PackedForward" | "CompressedPackedForward"; | ||
/** | ||
* The set of accepted Timestamp values | ||
*/ | ||
export declare type Timestamp = number | Date | EventTime; | ||
/** | ||
* Acknowledgement settings | ||
*/ | ||
export declare type AckOptions = { | ||
/** | ||
* How long to wait for an acknowledgement from the server | ||
*/ | ||
ackTimeout: number; | ||
}; | ||
/** | ||
* The constructor options passed to the client | ||
*/ | ||
export declare type FluentClientOptions = { | ||
/** | ||
* The event mode to use. Defaults to PackedForward | ||
*/ | ||
eventMode?: EventModes; | ||
/** | ||
* The connection options. See subtype for defaults. | ||
*/ | ||
socket?: FluentSocketOptions; | ||
/** | ||
* The fluentd security options. See subtype for defaults. | ||
*/ | ||
security?: FluentAuthOptions; | ||
/** | ||
* Acknowledgement settings. | ||
*/ | ||
ack?: Partial<AckOptions>; | ||
/** | ||
* How long to wait to flush the queued events | ||
* | ||
* Defaults to 0 | ||
*/ | ||
flushInterval?: number; | ||
/** | ||
* The timestamp resolution of events passed to FluentD. | ||
* | ||
* Defaults to false (seconds). If true, the resolution will be in milliseconds | ||
*/ | ||
milliseconds?: boolean; | ||
/** | ||
* The limit at which the queue needs to be flushed. | ||
* | ||
* This checks the memory size of the queue, which is only useful with `PackedForward` and `PackedForwardCompressed`. | ||
* | ||
* Useful with flushInterval to limit the size of the queue | ||
*/ | ||
sendQueueFlushSize?: number; | ||
/** | ||
* The limit at which the queue needs to be flushed. | ||
* | ||
* This checks the number of events in the queue, which is useful with all event modes. | ||
* | ||
* Useful with flushInterval to limit the size of the queue | ||
*/ | ||
sendQueueFlushLength?: number; | ||
/** | ||
* The limit at which we start dropping events | ||
* | ||
* This checks the memory size of the queue, which is only useful with `PackedForward` and `PackedForwardCompressed`. | ||
* | ||
* Prevents the queue from growing to an unbounded size and exhausting memory. | ||
*/ | ||
sendQueueMaxSize?: number; | ||
/** | ||
* The limit at which we start dropping events | ||
* | ||
* This checks the number of events in the queue, which is useful with all event modes. | ||
* | ||
* Prevents the queue from growing to an unbounded size and exhausting memory. | ||
*/ | ||
sendQueueMaxLength?: number; | ||
/** | ||
* An error handler which will receive socket error events | ||
* | ||
* Useful for logging, these will be handled internally | ||
*/ | ||
onSocketError?: (err: Error) => void; | ||
/** | ||
* Retry event submission on failure | ||
* | ||
* Warning: This effectively keeps the event in memory until it is successfully sent or retries exhausted | ||
* | ||
* See subtype for defaults | ||
*/ | ||
eventRetry?: Partial<EventRetryOptions>; | ||
}; | ||
/** | ||
* A Fluent Client. Connects to a FluentD server using the [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1). | ||
*/ | ||
export declare class FluentClient { | ||
@@ -44,10 +133,73 @@ private tag_prefix; | ||
private willFlushNextTick; | ||
constructor(tag_prefix: string, options?: FluentClientOptions); | ||
/** | ||
* Creates a new FluentClient | ||
* | ||
* @param tag_prefix A prefix to prefix to all tags. For example, passing the prefix "foo" will cause `emit("bar", data)` to emit with `foo.bar`. | ||
* @param options The client options | ||
*/ | ||
constructor(tag_prefix?: string | null, options?: FluentClientOptions); | ||
/** | ||
* Constructs a new socket | ||
* | ||
* @param security The security options, if any | ||
* @param options The socket options, if any | ||
* @returns A new FluentSocket | ||
*/ | ||
private createSocket; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param data The event to emit (required) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(data: protocol.EventRecord): Promise<void>; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param data The event to emit (required) | ||
* @param timestamp The timestamp of the event (optional) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(data: protocol.EventRecord, timestamp: Timestamp): Promise<void>; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param label The label to emit the data with (optional) | ||
* @param data The event to emit (required) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(label: string, data: protocol.EventRecord): Promise<void>; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param label The label to emit the data with (optional) | ||
* @param data The event to emit (required) | ||
* @param timestamp The timestamp of the event (optional) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(label: string, data: protocol.EventRecord, timestamp: Timestamp): Promise<void>; | ||
private sendEvent; | ||
/** | ||
* Pushes an event onto the sendQueue | ||
* | ||
* Also drops items from the queue if it is too large (size/length) | ||
* | ||
* @param tag The event tag | ||
* @param time The event timestamp | ||
* @param data The event data | ||
* @returns The promise from the sendQueue | ||
*/ | ||
private pushEvent; | ||
/** | ||
* Called once the underlying socket is writable | ||
* | ||
* Should attempt a flush | ||
*/ | ||
private handleWritable; | ||
/** | ||
* Connects the client. Happens automatically during construction, but can be called after a `disconnect()` to resume the client. | ||
*/ | ||
connect(): void; | ||
@@ -64,11 +216,57 @@ /** | ||
disconnect(): Promise<void>; | ||
/** | ||
* Creates a tag from the passed label and the constructor `tagPrefix`. | ||
* | ||
* @param label The label to create a tag from | ||
* @returns The constructed tag, or `null`. | ||
*/ | ||
private makeTag; | ||
/** | ||
* Flushes to the socket internally | ||
* | ||
* Managed by `flush` to not be called multiple times | ||
* @returns true if there are more events in the queue to flush, false otherwise | ||
*/ | ||
private innerFlush; | ||
/** | ||
* Flushes the event queue. Queues up the flushes for the next tick, preventing multiple flushes at the same time. | ||
* | ||
* @returns A promise, which resolves with a boolean indicating if there are more events to flush. | ||
*/ | ||
flush(): Promise<boolean>; | ||
/** | ||
* Potentially triggers a flush | ||
* | ||
* If we're flushing on an interval, check if the queue (size/length) limits have been reached, and otherwise schedule a new flush | ||
* | ||
* If not, just flush | ||
* @returns | ||
*/ | ||
private maybeFlush; | ||
/** | ||
* Send the front item of the queue to the socket | ||
* @returns True if there was something to send | ||
*/ | ||
private sendNext; | ||
/** | ||
* Creates an event for how long to wait for the ack | ||
* | ||
* @param chunkId The chunk ID we're waiting to ack | ||
* @param deferred The deferred to reject on timeout | ||
* @param ackTimeout The timeout length | ||
* @returns | ||
*/ | ||
private setupAckTimeout; | ||
/** | ||
* Called on an acknowledgement from the socket | ||
* | ||
* @param chunkId The chunk ID the socket has acknowledged | ||
* @returns | ||
*/ | ||
private handleAck; | ||
/** | ||
* Fails all acknowledgements | ||
* Called on shutdown | ||
*/ | ||
private clearAcks; | ||
} | ||
export {}; |
@@ -11,4 +11,13 @@ "use strict"; | ||
const event_retrier_1 = require("./event_retrier"); | ||
/** | ||
* A Fluent Client. Connects to a FluentD server using the [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1). | ||
*/ | ||
class FluentClient { | ||
constructor(tag_prefix, options = {}) { | ||
/** | ||
* Creates a new FluentClient | ||
* | ||
* @param tag_prefix A prefix to prefix to all tags. For example, passing the prefix "foo" will cause `emit("bar", data)` to emit with `foo.bar`. | ||
* @param options The client options | ||
*/ | ||
constructor(tag_prefix = null, options = {}) { | ||
this.ackQueue = new Map(); | ||
@@ -51,6 +60,4 @@ this.nextFlushTimeoutId = null; | ||
this.sendQueueFlushLength = options.sendQueueFlushLength || +Infinity; | ||
this.sendQueueMaxSize = | ||
options.sendQueueMaxSize || 2 * this.sendQueueFlushSize; | ||
this.sendQueueMaxLength = | ||
options.sendQueueMaxLength || 2 * this.sendQueueFlushSize; | ||
this.sendQueueMaxSize = options.sendQueueMaxSize || +Infinity; | ||
this.sendQueueMaxLength = options.sendQueueMaxLength || +Infinity; | ||
this.socket = this.createSocket(options.security, options.socket); | ||
@@ -64,2 +71,9 @@ this.socket.on("writable", () => this.handleWritable()); | ||
} | ||
/** | ||
* Constructs a new socket | ||
* | ||
* @param security The security options, if any | ||
* @param options The socket options, if any | ||
* @returns A new FluentSocket | ||
*/ | ||
createSocket(security, options) { | ||
@@ -124,9 +138,19 @@ if (security) { | ||
if (this.retrier !== null) { | ||
return this.retrier.retryPromise(() => this.sendEvent(tag, time, data)); | ||
return this.retrier.retryPromise(() => this.pushEvent(tag, time, data)); | ||
} | ||
else { | ||
return this.sendEvent(tag, time, data); | ||
return this.pushEvent(tag, time, data); | ||
} | ||
} | ||
sendEvent(tag, time, data) { | ||
/** | ||
* Pushes an event onto the sendQueue | ||
* | ||
* Also drops items from the queue if it is too large (size/length) | ||
* | ||
* @param tag The event tag | ||
* @param time The event timestamp | ||
* @param data The event data | ||
* @returns The promise from the sendQueue | ||
*/ | ||
pushEvent(tag, time, data) { | ||
const promise = this.sendQueue.push(tag, time, data); | ||
@@ -146,5 +170,13 @@ if (this.sendQueue.queueSize > this.sendQueueMaxSize) { | ||
} | ||
/** | ||
* Called once the underlying socket is writable | ||
* | ||
* Should attempt a flush | ||
*/ | ||
handleWritable() { | ||
this.maybeFlush(); | ||
} | ||
/** | ||
* Connects the client. Happens automatically during construction, but can be called after a `disconnect()` to resume the client. | ||
*/ | ||
connect() { | ||
@@ -182,2 +214,8 @@ this.socket.connect(); | ||
} | ||
/** | ||
* Creates a tag from the passed label and the constructor `tagPrefix`. | ||
* | ||
* @param label The label to create a tag from | ||
* @returns The constructed tag, or `null`. | ||
*/ | ||
makeTag(label) { | ||
@@ -196,2 +234,8 @@ let tag = null; | ||
} | ||
/** | ||
* Flushes to the socket internally | ||
* | ||
* Managed by `flush` to not be called multiple times | ||
* @returns true if there are more events in the queue to flush, false otherwise | ||
*/ | ||
innerFlush() { | ||
@@ -219,2 +263,7 @@ if (this.sendQueue.queueLength === 0) { | ||
} | ||
/** | ||
* Flushes the event queue. Queues up the flushes for the next tick, preventing multiple flushes at the same time. | ||
* | ||
* @returns A promise, which resolves with a boolean indicating if there are more events to flush. | ||
*/ | ||
flush() { | ||
@@ -230,2 +279,10 @@ // Prevent duplicate flushes next tick | ||
} | ||
/** | ||
* Potentially triggers a flush | ||
* | ||
* If we're flushing on an interval, check if the queue (size/length) limits have been reached, and otherwise schedule a new flush | ||
* | ||
* If not, just flush | ||
* @returns | ||
*/ | ||
maybeFlush() { | ||
@@ -264,2 +321,6 @@ // nothing to flush | ||
} | ||
/** | ||
* Send the front item of the queue to the socket | ||
* @returns True if there was something to send | ||
*/ | ||
sendNext() { | ||
@@ -302,10 +363,25 @@ let chunk; | ||
} | ||
/** | ||
* Creates an event for how long to wait for the ack | ||
* | ||
* @param chunkId The chunk ID we're waiting to ack | ||
* @param deferred The deferred to reject on timeout | ||
* @param ackTimeout The timeout length | ||
* @returns | ||
*/ | ||
setupAckTimeout(chunkId, deferred, ackTimeout) { | ||
return setTimeout(() => { | ||
// If the chunk isn't in the queue, then we must have removed it somewhere, assume that it didn't time out | ||
if (this.ackQueue.has(chunkId)) { | ||
deferred.reject(new error_1.AckTimeoutError("ack response timeout")); | ||
this.ackQueue.delete(chunkId); | ||
} | ||
deferred.reject(new error_1.AckTimeoutError("ack response timeout")); | ||
}, ackTimeout); | ||
} | ||
/** | ||
* Called on an acknowledgement from the socket | ||
* | ||
* @param chunkId The chunk ID the socket has acknowledged | ||
* @returns | ||
*/ | ||
handleAck(chunkId) { | ||
@@ -323,2 +399,6 @@ if (!this.ackQueue.has(chunkId)) { | ||
} | ||
/** | ||
* Fails all acknowledgements | ||
* Called on shutdown | ||
*/ | ||
clearAcks() { | ||
@@ -325,0 +405,0 @@ for (const data of this.ackQueue.values()) { |
declare class BaseError extends Error { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown on configuration errors, e.g bad event modes | ||
*/ | ||
export declare class ConfigError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when an event is emitted without either passing the tag to `client.emit` or providing a `tag_prefix`. | ||
*/ | ||
export declare class MissingTagError extends BaseError { | ||
constructor(message: string); | ||
} | ||
export declare class ResponseError extends BaseError { | ||
/** | ||
* Thrown when a client/server receives an unexpected/invalid message | ||
*/ | ||
export declare class UnexpectedMessageError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when a client waiting for an acknowledgement times out | ||
*/ | ||
export declare class AckTimeoutError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when a client tries to emit an invalid data format, e.g string as timestamp, string event, etc. | ||
*/ | ||
export declare class DataTypeError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when a socket times out, but in a weird state. | ||
*/ | ||
export declare class SocketTimeoutError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when a client tries to write to a socket, but the socket is not writable | ||
*/ | ||
export declare class SocketNotWritableError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when an event is dropped for any reason | ||
*/ | ||
export declare class DroppedError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when an event is dropped as a result of clearing out the queue | ||
*/ | ||
export declare class ClearDroppedError extends DroppedError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
*/ | ||
export declare class AuthError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
*/ | ||
export declare class ShutdownError extends BaseError { | ||
constructor(message: string); | ||
} | ||
export declare class SharedKeyMismatchError extends BaseError { | ||
/** | ||
* Thrown when the shared key doesn't match | ||
*/ | ||
export declare class SharedKeyMismatchError extends AuthError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when a message could not be decoded properly | ||
*/ | ||
export declare class DecodeError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when trying to call `connect()` on a fatal socket | ||
*/ | ||
export declare class FatalSocketError extends BaseError { | ||
@@ -41,0 +86,0 @@ constructor(message: string); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.FatalSocketError = exports.DecodeError = exports.SharedKeyMismatchError = exports.ShutdownError = exports.AuthError = exports.DroppedError = exports.SocketNotWritableError = exports.SocketTimeoutError = exports.DataTypeError = exports.AckTimeoutError = exports.ResponseError = exports.MissingTagError = exports.ConfigError = void 0; | ||
exports.FatalSocketError = exports.DecodeError = exports.SharedKeyMismatchError = exports.ShutdownError = exports.AuthError = exports.ClearDroppedError = exports.DroppedError = exports.SocketNotWritableError = exports.SocketTimeoutError = exports.DataTypeError = exports.AckTimeoutError = exports.UnexpectedMessageError = exports.MissingTagError = exports.ConfigError = void 0; | ||
class BaseError extends Error { | ||
@@ -12,2 +12,5 @@ constructor(message) { | ||
} | ||
/** | ||
* Thrown on configuration errors, e.g bad event modes | ||
*/ | ||
class ConfigError extends BaseError { | ||
@@ -19,2 +22,5 @@ constructor(message) { | ||
exports.ConfigError = ConfigError; | ||
/** | ||
* Thrown when an event is emitted without either passing the tag to `client.emit` or providing a `tag_prefix`. | ||
*/ | ||
class MissingTagError extends BaseError { | ||
@@ -26,3 +32,6 @@ constructor(message) { | ||
exports.MissingTagError = MissingTagError; | ||
class ResponseError extends BaseError { | ||
/** | ||
* Thrown when a client/server receives an unexpected/invalid message | ||
*/ | ||
class UnexpectedMessageError extends BaseError { | ||
constructor(message) { | ||
@@ -32,3 +41,6 @@ super(message); | ||
} | ||
exports.ResponseError = ResponseError; | ||
exports.UnexpectedMessageError = UnexpectedMessageError; | ||
/** | ||
* Thrown when a client waiting for an acknowledgement times out | ||
*/ | ||
class AckTimeoutError extends BaseError { | ||
@@ -40,2 +52,5 @@ constructor(message) { | ||
exports.AckTimeoutError = AckTimeoutError; | ||
/** | ||
* Thrown when a client tries to emit an invalid data format, e.g string as timestamp, string event, etc. | ||
*/ | ||
class DataTypeError extends BaseError { | ||
@@ -47,2 +62,5 @@ constructor(message) { | ||
exports.DataTypeError = DataTypeError; | ||
/** | ||
* Thrown when a socket times out, but in a weird state. | ||
*/ | ||
class SocketTimeoutError extends BaseError { | ||
@@ -54,2 +72,5 @@ constructor(message) { | ||
exports.SocketTimeoutError = SocketTimeoutError; | ||
/** | ||
* Thrown when a client tries to write to a socket, but the socket is not writable | ||
*/ | ||
class SocketNotWritableError extends BaseError { | ||
@@ -61,2 +82,5 @@ constructor(message) { | ||
exports.SocketNotWritableError = SocketNotWritableError; | ||
/** | ||
* Thrown when an event is dropped for any reason | ||
*/ | ||
class DroppedError extends BaseError { | ||
@@ -68,2 +92,14 @@ constructor(message) { | ||
exports.DroppedError = DroppedError; | ||
/** | ||
* Thrown when an event is dropped as a result of clearing out the queue | ||
*/ | ||
class ClearDroppedError extends DroppedError { | ||
constructor(message) { | ||
super(message); | ||
} | ||
} | ||
exports.ClearDroppedError = ClearDroppedError; | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
*/ | ||
class AuthError extends BaseError { | ||
@@ -75,2 +111,5 @@ constructor(message) { | ||
exports.AuthError = AuthError; | ||
/** | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
*/ | ||
class ShutdownError extends BaseError { | ||
@@ -82,3 +121,6 @@ constructor(message) { | ||
exports.ShutdownError = ShutdownError; | ||
class SharedKeyMismatchError extends BaseError { | ||
/** | ||
* Thrown when the shared key doesn't match | ||
*/ | ||
class SharedKeyMismatchError extends AuthError { | ||
constructor(message) { | ||
@@ -89,2 +131,5 @@ super(message); | ||
exports.SharedKeyMismatchError = SharedKeyMismatchError; | ||
/** | ||
* Thrown when a message could not be decoded properly | ||
*/ | ||
class DecodeError extends BaseError { | ||
@@ -96,2 +141,5 @@ constructor(message) { | ||
exports.DecodeError = DecodeError; | ||
/** | ||
* Thrown when trying to call `connect()` on a fatal socket | ||
*/ | ||
class FatalSocketError extends BaseError { | ||
@@ -98,0 +146,0 @@ constructor(message) { |
@@ -0,13 +1,55 @@ | ||
/** | ||
* Event retry settings | ||
* | ||
* The parameters represent an exponential backoff formula: | ||
* min(maxDelay, max(minDelay, backoff^attempts * delay)) | ||
*/ | ||
export declare type EventRetryOptions = { | ||
/** | ||
* How often we retry each event | ||
* | ||
* Defaults to 4 | ||
*/ | ||
attempts: number; | ||
/** | ||
* The backoff factor for each attempt | ||
* | ||
* Defaults to 2 | ||
*/ | ||
backoff: number; | ||
/** | ||
* The delay factor for each attempt | ||
* | ||
* Defaults to 100 | ||
*/ | ||
delay: number; | ||
/** | ||
* The global minimum delay | ||
*/ | ||
minDelay: number; | ||
/** | ||
* The global maximum delay | ||
*/ | ||
maxDelay: number; | ||
/** | ||
* Called with each error | ||
* | ||
* Can be used for logging, or if the error is non-retryable, this callback can `throw` the error to short circuit the callback. | ||
*/ | ||
onError: (err: Error) => void; | ||
}; | ||
/** | ||
* Provides retry logic for a promise, with failure cases | ||
*/ | ||
export declare class EventRetrier { | ||
private options; | ||
constructor(opts?: Partial<EventRetryOptions>); | ||
/** | ||
* Retry the promise | ||
* | ||
* Attempts the promise in an infinite loop, and retries according to the logic in EventRetryOptions | ||
* @param makePromise An async function to retry | ||
* @returns A Promise which succeeds if the async function succeeds, or has exhausted retry attempts | ||
*/ | ||
retryPromise<T>(makePromise: () => Promise<T>): Promise<T>; | ||
} |
@@ -5,2 +5,5 @@ "use strict"; | ||
const error_1 = require("./error"); | ||
/** | ||
* Provides retry logic for a promise, with failure cases | ||
*/ | ||
class EventRetrier { | ||
@@ -18,2 +21,9 @@ constructor(opts = {}) { | ||
} | ||
/** | ||
* Retry the promise | ||
* | ||
* Attempts the promise in an infinite loop, and retries according to the logic in EventRetryOptions | ||
* @param makePromise An async function to retry | ||
* @returns A Promise which succeeds if the async function succeeds, or has exhausted retry attempts | ||
*/ | ||
async retryPromise(makePromise) { | ||
@@ -20,0 +30,0 @@ let retryAttempts = 0; |
/// <reference types="node" /> | ||
/** | ||
* TS/JS representation of the [Fluentd EventTime](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format) type | ||
*/ | ||
declare class EventTime { | ||
epoch: number; | ||
nano: number; | ||
/** | ||
* The epoch of this EventTime (seconds since midnight, Jan 1st, 1970) | ||
*/ | ||
get epoch(): number; | ||
/** | ||
* The nano part of this EventTime (epoch + nano = timestamp nanos) | ||
*/ | ||
get nano(): number; | ||
private _epoch; | ||
private _nano; | ||
/** | ||
* Creates a new EventTime object | ||
* @param epoch The epoch (seconds since midnight, Jan 1st, 1970) | ||
* @param nano The nano part (epoch + nano = timestamp) | ||
*/ | ||
constructor(epoch: number, nano: number); | ||
/** | ||
* Packs the `EventTime` into a buffer | ||
* @internal | ||
* @param eventTime The `EventTime` object to pack | ||
* @returns The serialized `EventTime` | ||
*/ | ||
static pack(eventTime: EventTime): Buffer; | ||
/** | ||
* Unpacks an `EventTime` from a buffer | ||
* @internal | ||
* @param buffer The buffer to read the `EventTime` from | ||
* @returns The deserialized `EventTime`. | ||
*/ | ||
static unpack(buffer: Buffer): EventTime; | ||
/** | ||
* Returns the current timestamp as an `EventTime` | ||
* | ||
* Similar to `Date.now()` | ||
* @returns The EventTime representation of the current timestamp | ||
*/ | ||
static now(): EventTime; | ||
/** | ||
* Converts a `Date` to an `EventTime`. | ||
* | ||
* @param date The `Date` object to convert | ||
* @returns The equivalent `EventTime`. | ||
*/ | ||
static fromDate(date: Date): EventTime; | ||
/** | ||
* Creates a new `EventTime` from a numeric timestamp | ||
* | ||
* @param t The numeric timestamp to convert to an EventTime | ||
* @returns The EventTime representation of the timestamp | ||
*/ | ||
static fromTimestamp(t: number): EventTime; | ||
} | ||
export default EventTime; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
/** | ||
* TS/JS representation of the [Fluentd EventTime](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format) type | ||
*/ | ||
class EventTime { | ||
/** | ||
* Creates a new EventTime object | ||
* @param epoch The epoch (seconds since midnight, Jan 1st, 1970) | ||
* @param nano The nano part (epoch + nano = timestamp) | ||
*/ | ||
constructor(epoch, nano) { | ||
this.epoch = epoch; | ||
this.nano = nano; | ||
this._epoch = epoch; | ||
this._nano = nano; | ||
} | ||
/** | ||
* The epoch of this EventTime (seconds since midnight, Jan 1st, 1970) | ||
*/ | ||
get epoch() { | ||
return this._epoch; | ||
} | ||
/** | ||
* The nano part of this EventTime (epoch + nano = timestamp nanos) | ||
*/ | ||
get nano() { | ||
return this._nano; | ||
} | ||
/** | ||
* Packs the `EventTime` into a buffer | ||
* @internal | ||
* @param eventTime The `EventTime` object to pack | ||
* @returns The serialized `EventTime` | ||
*/ | ||
static pack(eventTime) { | ||
@@ -14,2 +40,8 @@ const b = Buffer.allocUnsafe(8); | ||
} | ||
/** | ||
* Unpacks an `EventTime` from a buffer | ||
* @internal | ||
* @param buffer The buffer to read the `EventTime` from | ||
* @returns The deserialized `EventTime`. | ||
*/ | ||
static unpack(buffer) { | ||
@@ -20,2 +52,8 @@ const e = buffer.readUInt32BE(0); | ||
} | ||
/** | ||
* Returns the current timestamp as an `EventTime` | ||
* | ||
* Similar to `Date.now()` | ||
* @returns The EventTime representation of the current timestamp | ||
*/ | ||
static now() { | ||
@@ -25,2 +63,8 @@ const now = Date.now(); | ||
} | ||
/** | ||
* Converts a `Date` to an `EventTime`. | ||
* | ||
* @param date The `Date` object to convert | ||
* @returns The equivalent `EventTime`. | ||
*/ | ||
static fromDate(date) { | ||
@@ -30,2 +74,8 @@ const t = date.getTime(); | ||
} | ||
/** | ||
* Creates a new `EventTime` from a numeric timestamp | ||
* | ||
* @param t The numeric timestamp to convert to an EventTime | ||
* @returns The EventTime representation of the timestamp | ||
*/ | ||
static fromTimestamp(t) { | ||
@@ -32,0 +82,0 @@ const epoch = Math.floor(t / 1000); |
export { FluentClient } from "./client"; | ||
export { FluentServer } from "./server"; | ||
export { default as EventTime } from "./event_time"; | ||
export type { FluentSocketOptions, ReconnectOptions } from "./socket"; | ||
export type { FluentAuthOptions } from "./auth"; | ||
export type { EventRetryOptions } from "./event_retrier"; | ||
export type { FluentClientOptions, Timestamp, AckOptions, EventModes, } from "./client"; | ||
export type { FluentServerOptions, FluentServerSecurityOptions } from "./server"; | ||
export * as FluentError from "./error"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.EventTime = exports.FluentServer = exports.FluentClient = void 0; | ||
exports.FluentError = exports.EventTime = exports.FluentServer = exports.FluentClient = void 0; | ||
var client_1 = require("./client"); | ||
@@ -10,2 +10,3 @@ Object.defineProperty(exports, "FluentClient", { enumerable: true, get: function () { return client_1.FluentClient; } }); | ||
Object.defineProperty(exports, "EventTime", { enumerable: true, get: function () { return event_time_1.default; } }); | ||
exports.FluentError = require("./error"); | ||
//# sourceMappingURL=index.js.map |
@@ -7,11 +7,26 @@ import * as pDefer from "p-defer"; | ||
entries: protocol.Entry[]; | ||
size: number; | ||
deferred: pDefer.DeferredPromise<void>; | ||
}; | ||
/** | ||
* Implements the Forward specification's [Forward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode) | ||
*/ | ||
export declare class ForwardQueue extends Queue { | ||
/** | ||
* Maintain the queue as a Map | ||
* | ||
* JS guarantees maps are insertion ordered, so calling sendQueue.values().next.value will be the first tag to be inserted. | ||
*/ | ||
private sendQueue; | ||
/** | ||
* The total number of events stored within the queue | ||
* | ||
* Note that this isn't just sendQueue.size because each entry in the map can have multiple events | ||
*/ | ||
private sendQueueLength; | ||
/** | ||
* Size is not measured for this queue | ||
*/ | ||
get queueSize(): number; | ||
get queueLength(): number; | ||
push(tag: string, time: protocol.Time, data: protocol.EventRecord): Promise<void>; | ||
push(tag: protocol.Tag, time: protocol.Time, data: protocol.EventRecord): Promise<void>; | ||
pop(): ForwardRecord | null; | ||
@@ -18,0 +33,0 @@ nextPacket(chunk?: protocol.Chunk): PacketData | null; |
@@ -7,9 +7,24 @@ "use strict"; | ||
const protocol = require("../protocol"); | ||
/** | ||
* Implements the Forward specification's [Forward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode) | ||
*/ | ||
class ForwardQueue extends queue_1.Queue { | ||
constructor() { | ||
super(...arguments); | ||
/** | ||
* Maintain the queue as a Map | ||
* | ||
* JS guarantees maps are insertion ordered, so calling sendQueue.values().next.value will be the first tag to be inserted. | ||
*/ | ||
this.sendQueue = new Map(); | ||
/** | ||
* The total number of events stored within the queue | ||
* | ||
* Note that this isn't just sendQueue.size because each entry in the map can have multiple events | ||
*/ | ||
this.sendQueueLength = 0; | ||
} | ||
// Size is not measured for this queue | ||
/** | ||
* Size is not measured for this queue | ||
*/ | ||
get queueSize() { | ||
@@ -27,3 +42,2 @@ return -1; | ||
entryData.entries.push(entry); | ||
entryData.size += entry.length; | ||
return entryData.deferred.promise; | ||
@@ -36,3 +50,2 @@ } | ||
entries: [entry], | ||
size: entry.length, | ||
deferred: deferred, | ||
@@ -39,0 +52,0 @@ }); |
@@ -10,10 +10,24 @@ import * as pDefer from "p-defer"; | ||
}; | ||
/** | ||
* Implements the Forward specification's [Message mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#message-modes) | ||
*/ | ||
export declare class MessageQueue extends Queue { | ||
/** | ||
* Maintain the queue as a Set | ||
* | ||
* JS guarantees sets are insertion ordered, so calling sendQueue.values().next.value will be the first entry to be inserted. | ||
*/ | ||
private sendQueue; | ||
/** | ||
* Size is not measured for this queue | ||
*/ | ||
get queueSize(): number; | ||
/** | ||
* The length of the queue | ||
*/ | ||
get queueLength(): number; | ||
push(tag: string, time: protocol.Time, event: protocol.EventRecord): Promise<void>; | ||
pop(): EventRecord | null; | ||
push(tag: protocol.Tag, time: protocol.Time, event: protocol.EventRecord): Promise<void>; | ||
protected pop(): EventRecord | null; | ||
nextPacket(chunk?: protocol.Chunk): PacketData | null; | ||
} | ||
export {}; |
@@ -7,11 +7,24 @@ "use strict"; | ||
const protocol = require("../protocol"); | ||
/** | ||
* Implements the Forward specification's [Message mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#message-modes) | ||
*/ | ||
class MessageQueue extends queue_1.Queue { | ||
constructor() { | ||
super(...arguments); | ||
/** | ||
* Maintain the queue as a Set | ||
* | ||
* JS guarantees sets are insertion ordered, so calling sendQueue.values().next.value will be the first entry to be inserted. | ||
*/ | ||
this.sendQueue = new Set(); | ||
} | ||
// Size is not measured for this queue | ||
/** | ||
* Size is not measured for this queue | ||
*/ | ||
get queueSize() { | ||
return -1; | ||
} | ||
/** | ||
* The length of the queue | ||
*/ | ||
get queueLength() { | ||
@@ -18,0 +31,0 @@ return this.sendQueue.size; |
@@ -7,19 +7,53 @@ import * as pDefer from "p-defer"; | ||
entries: Uint8Array[]; | ||
/** | ||
* The total length of the buffers in entries | ||
* | ||
* Useful for concatenating them all together later | ||
*/ | ||
size: number; | ||
deferred: pDefer.DeferredPromise<void>; | ||
}; | ||
/** | ||
* Implements the Forward specification's [(Compressed)?PackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode) | ||
* | ||
* Implements both in the same queue, because the only code difference is compression in `nextPacket`. | ||
* | ||
* Subclassed below into the correct modes. | ||
*/ | ||
declare class BasePackedForwardQueue extends Queue { | ||
/** | ||
* Maintain the queue as a Map | ||
* | ||
* JS guarantees maps are insertion ordered, so calling sendQueue.values().next.value will be the first tag to be inserted. | ||
*/ | ||
private sendQueue; | ||
/** | ||
* The total size of the buffers in the queue | ||
*/ | ||
private sendQueueSize; | ||
/** | ||
* The total number of events stored within the queue | ||
* | ||
* Note that this isn't just sendQueue.size because each entry in the map can have multiple events | ||
*/ | ||
private sendQueueLength; | ||
/** | ||
* Used to gate compression of `nextPacket`. The difference between PackedForward and CompressedPackedForward | ||
*/ | ||
protected compressed: boolean; | ||
get queueSize(): number; | ||
get queueLength(): number; | ||
push(tag: string, time: protocol.Time, data: protocol.EventRecord): Promise<void>; | ||
pop(): PackedRecord | null; | ||
push(tag: protocol.Tag, time: protocol.Time, data: protocol.EventRecord): Promise<void>; | ||
protected pop(): PackedRecord | null; | ||
nextPacket(chunk?: protocol.Chunk): PacketData | null; | ||
} | ||
/** | ||
* Implements the Forward specification's [PackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode) | ||
*/ | ||
export declare class PackedForwardQueue extends BasePackedForwardQueue { | ||
protected compressed: boolean; | ||
} | ||
/** | ||
* Implements the Forward specification's [CompressedPackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#compressedpackedforward-mode) | ||
*/ | ||
export declare class CompressedPackedForwardQueue extends BasePackedForwardQueue { | ||
@@ -26,0 +60,0 @@ protected compressed: boolean; |
@@ -7,8 +7,31 @@ "use strict"; | ||
const protocol = require("../protocol"); | ||
/** | ||
* Implements the Forward specification's [(Compressed)?PackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode) | ||
* | ||
* Implements both in the same queue, because the only code difference is compression in `nextPacket`. | ||
* | ||
* Subclassed below into the correct modes. | ||
*/ | ||
class BasePackedForwardQueue extends queue_1.Queue { | ||
constructor() { | ||
super(...arguments); | ||
/** | ||
* Maintain the queue as a Map | ||
* | ||
* JS guarantees maps are insertion ordered, so calling sendQueue.values().next.value will be the first tag to be inserted. | ||
*/ | ||
this.sendQueue = new Map(); | ||
/** | ||
* The total size of the buffers in the queue | ||
*/ | ||
this.sendQueueSize = 0; | ||
/** | ||
* The total number of events stored within the queue | ||
* | ||
* Note that this isn't just sendQueue.size because each entry in the map can have multiple events | ||
*/ | ||
this.sendQueueLength = 0; | ||
/** | ||
* Used to gate compression of `nextPacket`. The difference between PackedForward and CompressedPackedForward | ||
*/ | ||
this.compressed = false; | ||
@@ -71,2 +94,5 @@ } | ||
} | ||
/** | ||
* Implements the Forward specification's [PackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode) | ||
*/ | ||
class PackedForwardQueue extends BasePackedForwardQueue { | ||
@@ -79,2 +105,5 @@ constructor() { | ||
exports.PackedForwardQueue = PackedForwardQueue; | ||
/** | ||
* Implements the Forward specification's [CompressedPackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#compressedpackedforward-mode) | ||
*/ | ||
class CompressedPackedForwardQueue extends BasePackedForwardQueue { | ||
@@ -81,0 +110,0 @@ constructor() { |
import * as protocol from "../protocol"; | ||
import { DeferredPromise } from "p-defer"; | ||
/** | ||
* Every queue must have this type of data | ||
* | ||
* For DropEntry and clear purposes | ||
*/ | ||
export declare type EntryData = { | ||
/** | ||
* A deferred promise to resolve or reject once the data is sent or dropped | ||
*/ | ||
deferred: DeferredPromise<void>; | ||
}; | ||
/** | ||
* A packet to send to the Fluentd server | ||
*/ | ||
export declare type PacketData = { | ||
/** | ||
* The data to send | ||
*/ | ||
packet: Uint8Array; | ||
/** | ||
* A deferred promise to resolve once the data is successfully sent | ||
*/ | ||
deferred: DeferredPromise<void>; | ||
}; | ||
/** | ||
* Exposes a send queue for various event modes. | ||
*/ | ||
export declare abstract class Queue { | ||
/** | ||
* The # of entries in the queue | ||
*/ | ||
abstract get queueLength(): number; | ||
/** | ||
* The total size of the queue | ||
*/ | ||
abstract get queueSize(): number; | ||
abstract push(tag: string, time: protocol.Time, data: protocol.EventRecord): Promise<void>; | ||
/** | ||
* Add an event to the queue | ||
* | ||
* @param tag | ||
* @param time | ||
* @param data | ||
* @returns A Promise which is resolved once the data is sent, or rejected if there is an error | ||
*/ | ||
abstract push(tag: protocol.Tag, time: protocol.Time, data: protocol.EventRecord): Promise<void>; | ||
/** | ||
* Returns the next packet to send from the queue | ||
* | ||
* @param chunk A Chunk ID to send along, for acknowledgements if enabled | ||
* | ||
*/ | ||
abstract nextPacket(chunk?: protocol.Chunk): PacketData | null; | ||
abstract pop(): EntryData | null; | ||
/** | ||
* Returns and removes the first packet from the queue | ||
* | ||
* Used to drop events from the queue | ||
* @returns An entry, or null if the queue is empty | ||
*/ | ||
protected abstract pop(): EntryData | null; | ||
/** | ||
* Drops the item at the front of the queue | ||
* | ||
* Handles rejecting the promises, etc | ||
* @returns void | ||
*/ | ||
dropEntry(): void; | ||
/** | ||
* Clears out the queue | ||
*/ | ||
clear(): void; | ||
} |
@@ -5,3 +5,12 @@ "use strict"; | ||
const error_1 = require("../error"); | ||
/** | ||
* Exposes a send queue for various event modes. | ||
*/ | ||
class Queue { | ||
/** | ||
* Drops the item at the front of the queue | ||
* | ||
* Handles rejecting the promises, etc | ||
* @returns void | ||
*/ | ||
dropEntry() { | ||
@@ -13,9 +22,12 @@ const entryData = this.pop(); | ||
else { | ||
entryData.deferred.reject(new error_1.DroppedError("Message was dropped due to size limits")); | ||
entryData.deferred.reject(new error_1.DroppedError("Message was dropped due to limits")); | ||
} | ||
} | ||
/** | ||
* Clears out the queue | ||
*/ | ||
clear() { | ||
let entryData; | ||
while ((entryData = this.pop()) !== null) { | ||
entryData.deferred.reject(new error_1.DroppedError("Message dropped due to queue shutdown")); | ||
entryData.deferred.reject(new error_1.ClearDroppedError("Message dropped due to queue shutdown")); | ||
} | ||
@@ -22,0 +34,0 @@ } |
@@ -109,4 +109,25 @@ import EventTime from "./event_time"; | ||
export declare const generatePing: (hostname: ClientHostname, sharedKeyInfo: SharedKeyInfo, authInfo?: ClientAuthInfo | undefined) => PingMessage; | ||
/** | ||
* Validates a PING message from the client | ||
* | ||
* Assumes a valid structure (isPing has been called) | ||
* | ||
* @param m The ping message to validate | ||
* @param serverHostname The hostname of the client | ||
* @param serverKeyInfo The key info known to the server | ||
* @param authInfo Authentication information to validate (optional, auth not required if missing) | ||
* @returns An object with the complete SharedKeyInfo | ||
* @throws Error on mismatches | ||
*/ | ||
export declare const checkPing: (m: PingMessage, serverHostname: ServerHostname, serverKeyInfo: ServerKeyInfo, authInfo?: ServerAuthInfo | undefined) => PingResult; | ||
export declare const generatePong: (hostname: ServerHostname, authenticated: Authenticated, reason: Reason, sharedKeyInfo?: SharedKeyInfo | undefined) => PongMessage; | ||
/** | ||
* Checks the PONG message from the server | ||
* | ||
* Assumes a valid structure (isPong has been called) | ||
* @param m The PONG message from the server to validate | ||
* @param clientHostname The client hostname | ||
* @param sharedKeyInfo The client shared key information | ||
* @throws Error on validation issues | ||
*/ | ||
export declare const checkPong: (m: PongMessage, clientHostname: ClientHostname, sharedKeyInfo: SharedKeyInfo) => void; | ||
@@ -122,4 +143,13 @@ export declare const generateAck: (ack: string) => AckMessage; | ||
entries: Entry[]; | ||
/** | ||
* The chunk from the transport message, if any, used for acks | ||
*/ | ||
chunk?: Chunk; | ||
}; | ||
/** | ||
* Parses a transport message from the client | ||
* | ||
* @param message The transport message to parse | ||
* @returns An object with the decoded entries from the object | ||
*/ | ||
export declare const parseTransport: (message: ClientTransportMessage) => DecodedEntries; | ||
@@ -132,5 +162,24 @@ export declare const encode: (item: any) => Uint8Array; | ||
export declare const encodeMessage: (item: ServerMessage | ClientMessage) => Uint8Array; | ||
/** | ||
* Decodes a stream of data from the client | ||
* | ||
* @param dataStream A Readable to read the data from | ||
* @returns An iterable of messages from the client, not type checked | ||
*/ | ||
export declare const decodeClientStream: (dataStream: Readable) => AsyncIterable<ClientMessage>; | ||
/** | ||
* Decodes a stream of data from the server | ||
* | ||
* @param dataStream A Readable to read the data from | ||
* @returns An iterable of messages from the server, not type checked | ||
*/ | ||
export declare const decodeServerStream: (dataStream: Readable) => AsyncIterable<ServerMessage>; | ||
/** | ||
* Decodes a sequence of entries from a Buffer | ||
* | ||
* Useful for PackedForward|CompressedPackedForward event modes | ||
* @param data The data to unpack | ||
* @returns The entries from the data | ||
*/ | ||
export declare const decodeEntries: (data: Uint8Array) => Entry[]; | ||
export {}; |
@@ -230,2 +230,14 @@ "use strict"; | ||
exports.generatePing = generatePing; | ||
/** | ||
* Validates a PING message from the client | ||
* | ||
* Assumes a valid structure (isPing has been called) | ||
* | ||
* @param m The ping message to validate | ||
* @param serverHostname The hostname of the client | ||
* @param serverKeyInfo The key info known to the server | ||
* @param authInfo Authentication information to validate (optional, auth not required if missing) | ||
* @returns An object with the complete SharedKeyInfo | ||
* @throws Error on mismatches | ||
*/ | ||
const checkPing = (m, serverHostname, serverKeyInfo, authInfo) => { | ||
@@ -275,2 +287,11 @@ // ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || ''] | ||
exports.generatePong = generatePong; | ||
/** | ||
* Checks the PONG message from the server | ||
* | ||
* Assumes a valid structure (isPong has been called) | ||
* @param m The PONG message from the server to validate | ||
* @param clientHostname The client hostname | ||
* @param sharedKeyInfo The client shared key information | ||
* @throws Error on validation issues | ||
*/ | ||
const checkPong = (m, clientHostname, sharedKeyInfo) => { | ||
@@ -345,2 +366,8 @@ // [ | ||
exports.generateEntry = generateEntry; | ||
/** | ||
* Parses a transport message from the client | ||
* | ||
* @param message The transport message to parse | ||
* @returns An object with the decoded entries from the object | ||
*/ | ||
const parseTransport = (message) => { | ||
@@ -387,2 +414,5 @@ if (exports.isMessageMode(message)) { | ||
exports.parseTransport = parseTransport; | ||
/** | ||
* The [EventTime](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format) ext code is 0 | ||
*/ | ||
const EVENT_TIME_EXT_TYPE = 0x00; | ||
@@ -411,3 +441,15 @@ const extensionCodec = new msgpack_1.ExtensionCodec(); | ||
}); | ||
/** | ||
* Creates a newEncoder | ||
* | ||
* We can't share these because we were running into strange bugs where the internal buffers were being overwritten | ||
* @returns A new Encoder to use | ||
*/ | ||
const encoder = () => new msgpack_1.Encoder(extensionCodec); | ||
/** | ||
* Creates a new Decoder | ||
* | ||
* We can't share these because we were running into strange bugs where the internal buffers were being overwritten | ||
* @returns A new Decoder to use | ||
*/ | ||
const decoder = () => new msgpack_1.Decoder(extensionCodec); | ||
@@ -438,2 +480,8 @@ const encode = (item) => { | ||
exports.encodeMessage = encodeMessage; | ||
/** | ||
* Decodes a stream of data from the client | ||
* | ||
* @param dataStream A Readable to read the data from | ||
* @returns An iterable of messages from the client, not type checked | ||
*/ | ||
const decodeClientStream = (dataStream) => { | ||
@@ -443,2 +491,8 @@ return decoder().decodeStream(dataStream); | ||
exports.decodeClientStream = decodeClientStream; | ||
/** | ||
* Decodes a stream of data from the server | ||
* | ||
* @param dataStream A Readable to read the data from | ||
* @returns An iterable of messages from the server, not type checked | ||
*/ | ||
const decodeServerStream = (dataStream) => { | ||
@@ -448,2 +502,9 @@ return decoder().decodeStream(dataStream); | ||
exports.decodeServerStream = decodeServerStream; | ||
/** | ||
* Decodes a sequence of entries from a Buffer | ||
* | ||
* Useful for PackedForward|CompressedPackedForward event modes | ||
* @param data The data to unpack | ||
* @returns The entries from the data | ||
*/ | ||
const decodeEntries = (data) => { | ||
@@ -455,3 +516,3 @@ const entries = Array.from(decoder().decodeMulti(data)); | ||
else { | ||
throw new error_1.DecodeError("Received invalid entries " + JSON.stringify(entries)); | ||
throw new error_1.DecodeError("Received invalid entries"); | ||
} | ||
@@ -458,0 +519,0 @@ }; |
@@ -6,14 +6,59 @@ /// <reference types="node" /> | ||
import * as protocol from "./protocol"; | ||
declare type FluentServerSecurityOptions = { | ||
/** | ||
* The server security hardening options | ||
*/ | ||
export declare type FluentServerSecurityOptions = { | ||
/** | ||
* The hostname of the server. Should be unique to this process | ||
*/ | ||
serverHostname: string; | ||
/** | ||
* The shared key to authenticate clients with | ||
*/ | ||
sharedKey: string; | ||
/** | ||
* Whether to use user authentication | ||
*/ | ||
authorize: boolean; | ||
/** | ||
* A dict of users to their passwords | ||
*/ | ||
userDict: Record<string, string>; | ||
}; | ||
declare type FluentServerOptions = { | ||
/** | ||
* The server setup options | ||
*/ | ||
export declare type FluentServerOptions = { | ||
/** | ||
* The security options. | ||
* | ||
* Defaults to undefined (no auth). | ||
*/ | ||
security?: FluentServerSecurityOptions; | ||
/** | ||
* Whether or not to keep the sockets alive. Sent in HELO, but currently ignored | ||
* | ||
* Defaults to false | ||
*/ | ||
keepalive?: boolean; | ||
/** | ||
* TLS setup options. | ||
* | ||
* See the [Node.js docs](https://nodejs.org/api/tls.html#tls_tls_createserver_options_secureconnectionlistener) for more info | ||
* | ||
* Defaults to undefined | ||
*/ | ||
tlsOptions?: tls.TlsOptions; | ||
/** | ||
* Socket listen options | ||
* | ||
* See the [Node.js docs](https://nodejs.org/api/net.html#net_server_listen_options_callback) for more info | ||
* | ||
* Defaults to {port: 0} | ||
*/ | ||
listenOptions?: net.ListenOptions; | ||
}; | ||
/** | ||
* A Fluent [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1) compatible server | ||
*/ | ||
export declare class FluentServer extends EventEmitter { | ||
@@ -27,9 +72,37 @@ private _port; | ||
private listenOptions; | ||
/** | ||
* Creates a new server | ||
* | ||
* @param options The server connection options | ||
*/ | ||
constructor(options?: FluentServerOptions); | ||
/** | ||
* Handles a connection event on the server | ||
* @param socket | ||
*/ | ||
private handleConnection; | ||
onEntry(tag: string, time: protocol.Time, record: protocol.EventRecord): void; | ||
/** | ||
* Called for each entry received by the server. | ||
* | ||
* @param tag The tag of the entry | ||
* @param time The timestamp of the entry | ||
* @param record The entry record | ||
*/ | ||
protected onEntry(tag: protocol.Tag, time: protocol.Time, record: protocol.EventRecord): void; | ||
/** | ||
* Returns the port the server is currently listening on | ||
*/ | ||
get port(): number | undefined; | ||
/** | ||
* Start the server | ||
* | ||
* @returns A Promise which resolves once the server is listening | ||
*/ | ||
listen(): Promise<void>; | ||
/** | ||
* Shutdown the server | ||
* | ||
* @returns A Promise, which resolves once the server has fully shut down. | ||
*/ | ||
close(): Promise<void>; | ||
} | ||
export {}; |
@@ -10,9 +10,25 @@ "use strict"; | ||
const error_1 = require("./error"); | ||
/** | ||
* Manages the state of the client | ||
*/ | ||
var ClientState; | ||
(function (ClientState) { | ||
/** | ||
* Can receive events from this client | ||
*/ | ||
ClientState[ClientState["ESTABLISHED"] = 0] = "ESTABLISHED"; | ||
ClientState[ClientState["HELO"] = 1] = "HELO"; | ||
ClientState[ClientState["PING"] = 2] = "PING"; | ||
/** | ||
* Waiting for a PING from this client | ||
*/ | ||
ClientState[ClientState["PING"] = 1] = "PING"; | ||
})(ClientState || (ClientState = {})); | ||
/** | ||
* A Fluent [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1) compatible server | ||
*/ | ||
class FluentServer extends EventEmitter { | ||
/** | ||
* Creates a new server | ||
* | ||
* @param options The server connection options | ||
*/ | ||
constructor(options = {}) { | ||
@@ -33,2 +49,6 @@ super(); | ||
} | ||
/** | ||
* Handles a connection event on the server | ||
* @param socket | ||
*/ | ||
async handleConnection(socket) { | ||
@@ -65,3 +85,3 @@ var _a; | ||
// Unexpected PONG when we didn't send a HELO | ||
throw new error_1.ResponseError("Unexpected PING"); | ||
throw new error_1.UnexpectedMessageError("Unexpected PING"); | ||
} | ||
@@ -98,3 +118,3 @@ try { | ||
else { | ||
throw new error_1.ResponseError("Unexpected message"); | ||
throw new error_1.UnexpectedMessageError("Unexpected message"); | ||
} | ||
@@ -108,8 +128,23 @@ } | ||
} | ||
/** | ||
* Called for each entry received by the server. | ||
* | ||
* @param tag The tag of the entry | ||
* @param time The timestamp of the entry | ||
* @param record The entry record | ||
*/ | ||
onEntry(tag, time, record) { | ||
this.emit("entry", tag, time, record); | ||
} | ||
/** | ||
* Returns the port the server is currently listening on | ||
*/ | ||
get port() { | ||
return this._port; | ||
} | ||
/** | ||
* Start the server | ||
* | ||
* @returns A Promise which resolves once the server is listening | ||
*/ | ||
listen() { | ||
@@ -125,2 +160,7 @@ return new Promise(resolve => { | ||
} | ||
/** | ||
* Shutdown the server | ||
* | ||
* @returns A Promise, which resolves once the server has fully shut down. | ||
*/ | ||
close() { | ||
@@ -127,0 +167,0 @@ return new Promise(resolve => { |
@@ -5,20 +5,91 @@ /// <reference types="node" /> | ||
import * as protocol from "./protocol"; | ||
declare type ReconnectOptions = { | ||
/** | ||
* Reconnection settings for the socket | ||
* | ||
* The parameters represent an exponential backoff formula: | ||
* min(maxDelay, max(minDelay, backoff^attempts * delay)) | ||
* | ||
* The attempt count is incremented each time we fail a connection, | ||
* and set to zero each time a connection is successfully made. | ||
* Note this is before handshaking | ||
*/ | ||
export declare type ReconnectOptions = { | ||
/** | ||
* The backoff factor for each attempt | ||
* | ||
* Defaults to 2 | ||
*/ | ||
backoff: number; | ||
/** | ||
* The delay factor for each attempt | ||
* | ||
* Defaults to 500 | ||
*/ | ||
delay: number; | ||
/** | ||
* The global minimum delay | ||
*/ | ||
minDelay: number; | ||
/** | ||
* The global maximum delay | ||
*/ | ||
maxDelay: number; | ||
}; | ||
export declare type FluentSocketOptions = { | ||
/** | ||
* If connecting to a unix domain socket, e.g unix:///var/run/fluentd.sock, then specify that here. | ||
* | ||
* This overrides host/port below. Defaults to `undefined`. | ||
*/ | ||
path?: string; | ||
/** | ||
* The host (IP) to connect to | ||
* | ||
* Defaults to `localhost`. | ||
*/ | ||
host?: string; | ||
/** | ||
* The port to connect to | ||
* | ||
* Defaults to `24224`. | ||
*/ | ||
port?: number; | ||
/** | ||
* The socket timeout to set. After timing out, the socket will be idle and reconnect once the client wants to write something. | ||
* | ||
* Defaults to 3000 (3 seconds) | ||
*/ | ||
timeout?: number; | ||
/** | ||
* TLS connection options. See [Node docs](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) | ||
* | ||
* If provided, the socket will be a TLS socket. | ||
*/ | ||
tls?: tls.ConnectionOptions; | ||
/** | ||
* Reconnection options. See subtype for defaults. | ||
*/ | ||
reconnect?: Partial<ReconnectOptions>; | ||
/** | ||
* Disable reconnection on failure. This can be useful for one-offs | ||
* | ||
* Defaults to false | ||
*/ | ||
disableReconnect?: boolean; | ||
}; | ||
/** | ||
* How to close the socket | ||
*/ | ||
export declare enum CloseState { | ||
/** | ||
* Make the socket unable to reconnect | ||
*/ | ||
FATAL = 0, | ||
/** | ||
* Allow the socket to reconnect manually | ||
*/ | ||
CLOSE = 1, | ||
/** | ||
* Allow the socket to reconnect automatically | ||
*/ | ||
RECONNECT = 2 | ||
@@ -42,15 +113,74 @@ } | ||
private reconnect; | ||
/** | ||
* Used so we can read from the socket through an AsyncIterable. | ||
* Protects the reader from accidentally closing the socket on errors. | ||
*/ | ||
private passThroughStream; | ||
/** | ||
* Creates a new socket | ||
* | ||
* @param options The socket connection options | ||
*/ | ||
constructor(options?: FluentSocketOptions); | ||
/** | ||
* Connects the socket to the upstream server | ||
* | ||
* @returns void | ||
*/ | ||
connect(): void; | ||
/** | ||
* May reconnect the socket | ||
* @returns void | ||
*/ | ||
private maybeReconnect; | ||
/** | ||
* Creates a new TLS socket | ||
* @returns A new socket to use for the connection | ||
*/ | ||
private createTlsSocket; | ||
/** | ||
* Creates a new TCP socket | ||
* @returns A new socket to use for the connection | ||
*/ | ||
private createTcpSocket; | ||
/** | ||
* Returns a new socket | ||
* | ||
* @param onConnect Called once the socket is connected | ||
* @returns | ||
*/ | ||
private createSocket; | ||
/** | ||
* Sets up the socket to be opened | ||
*/ | ||
private openSocket; | ||
/** | ||
* Called once the socket is connected | ||
*/ | ||
private handleConnect; | ||
/** | ||
* Processes messages from the socket | ||
* | ||
* @param iterable The socket read data stream | ||
* @returns Promise for when parsing completes | ||
*/ | ||
private processMessages; | ||
/** | ||
* Called from an error event on the socket | ||
*/ | ||
private handleError; | ||
/** | ||
* Called when the socket times out | ||
* Should suspend the socket (set it to IDLE) | ||
*/ | ||
private handleTimeout; | ||
/** | ||
* Called from a "close" event on the socket | ||
* | ||
* Should clean up the state, and potentially trigger a reconnect | ||
*/ | ||
private handleClose; | ||
/** | ||
* Called when the socket has fully drained, and the buffers are free again | ||
*/ | ||
private handleDrain; | ||
@@ -145,2 +275,1 @@ /** | ||
} | ||
export {}; |
@@ -12,15 +12,98 @@ "use strict"; | ||
(function (SocketState) { | ||
/** | ||
* In this state, the socket doesn't exist, and we can't read or write from it. | ||
* | ||
* No read | ||
* No write | ||
* Transitions to CONNECTING on call to connect() (potentially from maybeReconnect()) | ||
*/ | ||
SocketState[SocketState["DISCONNECTED"] = 0] = "DISCONNECTED"; | ||
/** | ||
* In this state we're working on making the connection (opening socket + TLS negotiations if any), but haven't finished. | ||
* | ||
* No read | ||
* No write | ||
* Transitions to DISCONNECTED on error | ||
* Transitions to CONNECTED on success | ||
*/ | ||
SocketState[SocketState["CONNECTING"] = 1] = "CONNECTING"; | ||
/** | ||
* In this state, we're doing some preparatory work before accepting writes | ||
* | ||
* Internal read | ||
* Internal write | ||
* Transitions to DISCONNECTED on soft error (will reconnect) | ||
* Transitions to DISCONNECTING on close + medium error | ||
* Transitions to FATAL on hard error | ||
*/ | ||
SocketState[SocketState["CONNECTED"] = 2] = "CONNECTED"; | ||
/** | ||
* In this state, we're fully open, and able to read and write to the socket | ||
* | ||
* Can read | ||
* Can write | ||
* Transitions to DISCONNECTED on soft error (will reconnect) | ||
* Transitions to DISCONNECTING on close + medium error | ||
* Transitions to FATAL on hard error | ||
* Tansitions to DRAINING when socket.write returns false (buffer full) | ||
* Transitions to IDLE on timeout | ||
*/ | ||
SocketState[SocketState["ESTABLISHED"] = 3] = "ESTABLISHED"; | ||
/** | ||
* In this state, the socket has blocked writes, as the kernel buffer is full. | ||
* | ||
* Can read | ||
* No write | ||
* Transitions to ESTABLISHED on drain event | ||
* Transitions to DISCONNECTED on soft error (will reconnect) | ||
* Transitions to DISCONNECTING on close + medium error | ||
* Transitions to FATAL on hard error | ||
*/ | ||
SocketState[SocketState["DRAINING"] = 4] = "DRAINING"; | ||
/** | ||
* In this state, the socket is being closed, and will not be reconnected, either as the result of user action, or an event. | ||
* | ||
* Can read | ||
* No write | ||
* Transitions to DISCONNECTED on close event | ||
*/ | ||
SocketState[SocketState["DISCONNECTING"] = 5] = "DISCONNECTING"; | ||
/** | ||
* In this state, the socket has timed out due to inactivity. It will be reconnected once the user calls `writable()`. | ||
* | ||
* We don't auto reconnect from this state, as the idle timeout indicates low event activity. | ||
* It can also potentially indicate a misconfiguration where the timeout is too low. | ||
* | ||
* No read | ||
* No write | ||
* Transitions to CONNECTING on call to connect() (potentially from writable()) | ||
*/ | ||
SocketState[SocketState["IDLE"] = 6] = "IDLE"; | ||
/** | ||
* In this state, the socket has run into a fatal error, which it believes there is no point in reconnecting. | ||
* | ||
* This is almost always a configuration misconfiguration, for example the server requires auth, but the client has no auth information. | ||
* | ||
* No read | ||
* No write | ||
* Does not transition | ||
*/ | ||
SocketState[SocketState["FATAL"] = 7] = "FATAL"; | ||
})(SocketState || (SocketState = {})); | ||
/** | ||
* How to close the socket | ||
*/ | ||
var CloseState; | ||
(function (CloseState) { | ||
/** | ||
* Make the socket unable to reconnect | ||
*/ | ||
CloseState[CloseState["FATAL"] = 0] = "FATAL"; | ||
/** | ||
* Allow the socket to reconnect manually | ||
*/ | ||
CloseState[CloseState["CLOSE"] = 1] = "CLOSE"; | ||
/** | ||
* Allow the socket to reconnect automatically | ||
*/ | ||
CloseState[CloseState["RECONNECT"] = 2] = "RECONNECT"; | ||
@@ -45,2 +128,7 @@ })(CloseState = exports.CloseState || (exports.CloseState = {})); | ||
class FluentSocket extends EventEmitter { | ||
/** | ||
* Creates a new socket | ||
* | ||
* @param options The socket connection options | ||
*/ | ||
constructor(options = {}) { | ||
@@ -52,2 +140,6 @@ super(); | ||
this.connectAttempts = 0; | ||
/** | ||
* Used so we can read from the socket through an AsyncIterable. | ||
* Protects the reader from accidentally closing the socket on errors. | ||
*/ | ||
this.passThroughStream = null; | ||
@@ -75,2 +167,7 @@ if (options.path) { | ||
} | ||
/** | ||
* Connects the socket to the upstream server | ||
* | ||
* @returns void | ||
*/ | ||
connect() { | ||
@@ -104,2 +201,6 @@ if (this.state === SocketState.FATAL) { | ||
} | ||
/** | ||
* May reconnect the socket | ||
* @returns void | ||
*/ | ||
maybeReconnect() { | ||
@@ -110,3 +211,3 @@ if (!this.reconnectEnabled || this.reconnectTimeoutId !== null) { | ||
if (this.state !== SocketState.DISCONNECTED) { | ||
// Socket is connected or in a fatal state | ||
// Socket is connected or in a fatal state or idle | ||
return; | ||
@@ -121,8 +222,22 @@ } | ||
} | ||
/** | ||
* Creates a new TLS socket | ||
* @returns A new socket to use for the connection | ||
*/ | ||
createTlsSocket() { | ||
return tls.connect({ ...this.tlsOptions, ...this.socketParams }); | ||
} | ||
/** | ||
* Creates a new TCP socket | ||
* @returns A new socket to use for the connection | ||
*/ | ||
createTcpSocket() { | ||
return net.createConnection({ ...this.socketParams, timeout: this.timeout }); | ||
} | ||
/** | ||
* Returns a new socket | ||
* | ||
* @param onConnect Called once the socket is connected | ||
* @returns | ||
*/ | ||
createSocket(onConnect) { | ||
@@ -140,2 +255,5 @@ if (this.tlsEnabled) { | ||
} | ||
/** | ||
* Sets up the socket to be opened | ||
*/ | ||
openSocket() { | ||
@@ -154,2 +272,5 @@ this.state = SocketState.CONNECTING; | ||
} | ||
/** | ||
* Called once the socket is connected | ||
*/ | ||
handleConnect() { | ||
@@ -161,2 +282,8 @@ this.connectAttempts = 0; | ||
} | ||
/** | ||
* Processes messages from the socket | ||
* | ||
* @param iterable The socket read data stream | ||
* @returns Promise for when parsing completes | ||
*/ | ||
async processMessages(iterable) { | ||
@@ -169,8 +296,15 @@ try { | ||
catch (e) { | ||
this.close(CloseState.FATAL, e); | ||
this.close(CloseState.CLOSE, e); | ||
} | ||
} | ||
/** | ||
* Called from an error event on the socket | ||
*/ | ||
handleError(error) { | ||
this.onError(error); | ||
} | ||
/** | ||
* Called when the socket times out | ||
* Should suspend the socket (set it to IDLE) | ||
*/ | ||
handleTimeout() { | ||
@@ -185,2 +319,7 @@ if (this.socket !== null && isConnected(this.state)) { | ||
} | ||
/** | ||
* Called from a "close" event on the socket | ||
* | ||
* Should clean up the state, and potentially trigger a reconnect | ||
*/ | ||
handleClose() { | ||
@@ -210,2 +349,5 @@ var _a; | ||
} | ||
/** | ||
* Called when the socket has fully drained, and the buffers are free again | ||
*/ | ||
handleDrain() { | ||
@@ -281,7 +423,7 @@ // We may not have noticed that we were draining, or we may have moved to a different state in the mean time | ||
else { | ||
this.close(CloseState.CLOSE, new error_1.ResponseError("Received unexpected message")); | ||
this.close(CloseState.CLOSE, new error_1.UnexpectedMessageError("Received unexpected message")); | ||
} | ||
} | ||
else { | ||
this.close(CloseState.CLOSE, new error_1.ResponseError("Received unexpected message")); | ||
this.close(CloseState.CLOSE, new error_1.UnexpectedMessageError("Received unexpected message")); | ||
} | ||
@@ -288,0 +430,0 @@ } |
{ | ||
"name": "fluentd-node", | ||
"version": "0.0.3", | ||
"version": "0.0.4", | ||
"description": "A fluent protocol implementation in node", | ||
@@ -50,2 +50,3 @@ "main": "./build/src/index.js", | ||
"sinon": "^11.1.1", | ||
"typedoc": "^0.21.0", | ||
"typescript": "^4.0.3" | ||
@@ -52,0 +53,0 @@ }, |
@@ -1,4 +0,7 @@ | ||
# fluent-logger for Node.js | ||
# fluentd-node | ||
[![Build Status](https://github.com/jamiees2/fluentd-node/actions/workflows/main.yml/badge.svg)](https://github.com/jamiees2/fluentd-node/actions) | ||
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | ||
[![Docs](https://img.shields.io/badge/Docs-latest-green)](https://jamiees2.github.io/fluentd-node/) | ||
fluent-logger implementation for Node.js. | ||
[Fluent Forward Protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1) implementation for Node.js. | ||
Built upon [fluent-logger-node](https://github.com/fluent/fluent-logger-node). | ||
@@ -8,3 +11,2 @@ | ||
[![Build Status](https://github.com/jamiees2/fluentd-node/actions/workflows/main.yml/badge.svg)](https://github.com/jamiees2/fluentd-node/actions) | ||
@@ -15,3 +17,3 @@ ## Install | ||
## Prerequistes | ||
## Prerequisites | ||
@@ -59,3 +61,3 @@ The fluent daemon should be listening on a TCP port. | ||
### Acknowledgements | ||
### Fluentd acknowledgements | ||
Fluentd provides explicit support for acknowledgements, which allow the client to be sure that the event reached its destination. | ||
@@ -266,2 +268,5 @@ | ||
### Client Options | ||
For a full list of the client options and methods, see the [FluentClient docs](https://jamiees2.github.io/fluentd-node/classes/fluentclient.html) | ||
### Server | ||
@@ -300,3 +305,5 @@ `fluentd-node` includes a fully functional forward server which can be used as a downstream Fluent sink. | ||
For a full list of the server options and methods, see the [FluentServer docs](https://jamiees2.github.io/fluentd-node/classes/fluentserver.html) | ||
## License | ||
@@ -303,0 +310,0 @@ |
import * as crypto from "crypto"; | ||
import {FluentSocket, FluentSocketOptions, CloseState} from "./socket"; | ||
import {ResponseError} from "./error"; | ||
import {UnexpectedMessageError} from "./error"; | ||
import * as protocol from "./protocol"; | ||
/** | ||
* The authentication options for the client | ||
*/ | ||
export type FluentAuthOptions = { | ||
/** | ||
* The client host name (required). | ||
* | ||
* Must be unique to this process | ||
*/ | ||
clientHostname: string; | ||
/** | ||
* The shared key with the server. (required) | ||
*/ | ||
sharedKey: string; | ||
/** | ||
* The username to authenticate with. (optional) | ||
*/ | ||
username?: string; | ||
/** | ||
* The password to authenticate with. (optional) | ||
*/ | ||
password?: string; | ||
@@ -14,8 +31,23 @@ }; | ||
enum FluentAuthState { | ||
/** | ||
* The client is not authenticated (socket is not connected) | ||
*/ | ||
UNAUTHENTICATED, | ||
/** | ||
* The client is waiting for a HELO from the server | ||
*/ | ||
HELO, | ||
/** | ||
* The client is waiting for a PONG from the server | ||
*/ | ||
PONG, | ||
/** | ||
* The client is fully authenticated | ||
*/ | ||
AUTHENTICATED, | ||
} | ||
/** | ||
* An implementation of FluentSocket which authenticates the socket using the [Forward protocol Handshake](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#handshake-messages) | ||
*/ | ||
export class FluentAuthSocket extends FluentSocket { | ||
@@ -29,2 +61,7 @@ private authState: FluentAuthState = FluentAuthState.UNAUTHENTICATED; | ||
/** | ||
* Creates a new instance of the socket | ||
* @param authOptions The authentication options to use | ||
* @param socketOptions The socket options to pass to the underlying socket | ||
*/ | ||
constructor( | ||
@@ -45,2 +82,5 @@ authOptions: FluentAuthOptions, | ||
/** | ||
* Once the socket is connected, we expect a HELO | ||
*/ | ||
protected onConnected(): void { | ||
@@ -50,2 +90,5 @@ this.authState = FluentAuthState.HELO; | ||
/** | ||
* When the socket is closed, we're unauthenticated | ||
*/ | ||
protected onClose(): void { | ||
@@ -56,2 +99,9 @@ this.authState = FluentAuthState.UNAUTHENTICATED; | ||
/** | ||
* Handles messages from the server | ||
* | ||
* If we're waiting for a message, this will trigger it, otherwise just forward to the superclass. | ||
* | ||
* @param message The message to check | ||
*/ | ||
protected onMessage(message: protocol.ServerMessage): void { | ||
@@ -70,3 +120,3 @@ if (protocol.isHelo(message) && this.authState === FluentAuthState.HELO) { | ||
CloseState.CLOSE, | ||
new ResponseError("Received unexpected message") | ||
new UnexpectedMessageError("Received unexpected message") | ||
); | ||
@@ -76,2 +126,9 @@ } | ||
/** | ||
* Called on a HELO message | ||
* | ||
* Should parse the message, and send back a PING | ||
* | ||
* @param message The HELO message | ||
*/ | ||
private handleHelo(message: protocol.HeloMessage): void { | ||
@@ -100,2 +157,10 @@ const heloOptions = protocol.parseHelo(message); | ||
/** | ||
* Called on a PONG message | ||
* | ||
* Should parse and validate the message, and if valid, establish the connection | ||
* | ||
* @param message The PONG message | ||
* @returns void | ||
*/ | ||
private handlePong(message: protocol.PongMessage): void { | ||
@@ -102,0 +167,0 @@ try { |
@@ -23,3 +23,20 @@ import { | ||
type EventModes = | ||
type AckData = { | ||
timeoutId: NodeJS.Timeout; | ||
deferred: DeferredPromise<void>; | ||
}; | ||
/** | ||
* The set of accepted event modes. See [Forward protocol spec](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#event-modes) | ||
* | ||
* `Message` will send each event to FluentD individually. | ||
* | ||
* `Forward` will collect the events together by tag, and send them together to FluentD in a single packet. | ||
* This is more efficient with a `flushInterval` to batch these together. | ||
* | ||
* `PackedForward` will behave the same as `Forward`, but will pack the events as part of entering the queue. This saves memory and bandwidth. | ||
* | ||
* `CompressedPackedForward` will behave the same as `PackedForward`, but additionally compress the items before emission, saving more bandwidth. | ||
*/ | ||
export type EventModes = | ||
| "Message" | ||
@@ -30,30 +47,102 @@ | "Forward" | ||
type Timestamp = number | Date | EventTime; | ||
/** | ||
* The set of accepted Timestamp values | ||
*/ | ||
export type Timestamp = number | Date | EventTime; | ||
type AckOptions = { | ||
/** | ||
* Acknowledgement settings | ||
*/ | ||
export type AckOptions = { | ||
/** | ||
* How long to wait for an acknowledgement from the server | ||
*/ | ||
ackTimeout: number; | ||
}; | ||
type AckData = { | ||
timeoutId: NodeJS.Timeout; | ||
deferred: DeferredPromise<void>; | ||
}; | ||
/** | ||
* The constructor options passed to the client | ||
*/ | ||
export type FluentClientOptions = { | ||
/** | ||
* The event mode to use. Defaults to PackedForward | ||
*/ | ||
eventMode?: EventModes; | ||
/** | ||
* The connection options. See subtype for defaults. | ||
*/ | ||
socket?: FluentSocketOptions; | ||
/** | ||
* The fluentd security options. See subtype for defaults. | ||
*/ | ||
security?: FluentAuthOptions; | ||
/** | ||
* Acknowledgement settings. | ||
*/ | ||
ack?: Partial<AckOptions>; | ||
/** | ||
* How long to wait to flush the queued events | ||
* | ||
* Defaults to 0 | ||
*/ | ||
flushInterval?: number; | ||
/** | ||
* The timestamp resolution of events passed to FluentD. | ||
* | ||
* Defaults to false (seconds). If true, the resolution will be in milliseconds | ||
*/ | ||
milliseconds?: boolean; | ||
/** | ||
* The limit at which the queue needs to be flushed. | ||
* | ||
* This checks the memory size of the queue, which is only useful with `PackedForward` and `PackedForwardCompressed`. | ||
* | ||
* Useful with flushInterval to limit the size of the queue | ||
*/ | ||
sendQueueFlushSize?: number; | ||
/** | ||
* The limit at which the queue needs to be flushed. | ||
* | ||
* This checks the number of events in the queue, which is useful with all event modes. | ||
* | ||
* Useful with flushInterval to limit the size of the queue | ||
*/ | ||
sendQueueFlushLength?: number; | ||
/** | ||
* The limit at which we start dropping events | ||
* | ||
* This checks the memory size of the queue, which is only useful with `PackedForward` and `PackedForwardCompressed`. | ||
* | ||
* Prevents the queue from growing to an unbounded size and exhausting memory. | ||
*/ | ||
sendQueueMaxSize?: number; | ||
/** | ||
* The limit at which we start dropping events | ||
* | ||
* This checks the number of events in the queue, which is useful with all event modes. | ||
* | ||
* Prevents the queue from growing to an unbounded size and exhausting memory. | ||
*/ | ||
sendQueueMaxLength?: number; | ||
/** | ||
* An error handler which will receive socket error events | ||
* | ||
* Useful for logging, these will be handled internally | ||
*/ | ||
onSocketError?: (err: Error) => void; | ||
/** | ||
* Retry event submission on failure | ||
* | ||
* Warning: This effectively keeps the event in memory until it is successfully sent or retries exhausted | ||
* | ||
* See subtype for defaults | ||
*/ | ||
eventRetry?: Partial<EventRetryOptions>; | ||
}; | ||
/** | ||
* A Fluent Client. Connects to a FluentD server using the [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1). | ||
*/ | ||
export class FluentClient { | ||
private tag_prefix: string; | ||
private tag_prefix: string | null; | ||
private eventMode: EventModes; | ||
@@ -80,3 +169,12 @@ private ackEnabled: boolean; | ||
constructor(tag_prefix: string, options: FluentClientOptions = {}) { | ||
/** | ||
* Creates a new FluentClient | ||
* | ||
* @param tag_prefix A prefix to prefix to all tags. For example, passing the prefix "foo" will cause `emit("bar", data)` to emit with `foo.bar`. | ||
* @param options The client options | ||
*/ | ||
constructor( | ||
tag_prefix: string | null = null, | ||
options: FluentClientOptions = {} | ||
) { | ||
options = options || {}; | ||
@@ -111,6 +209,4 @@ this.eventMode = options.eventMode || "PackedForward"; | ||
this.sendQueueFlushLength = options.sendQueueFlushLength || +Infinity; | ||
this.sendQueueMaxSize = | ||
options.sendQueueMaxSize || 2 * this.sendQueueFlushSize; | ||
this.sendQueueMaxLength = | ||
options.sendQueueMaxLength || 2 * this.sendQueueFlushSize; | ||
this.sendQueueMaxSize = options.sendQueueMaxSize || +Infinity; | ||
this.sendQueueMaxLength = options.sendQueueMaxLength || +Infinity; | ||
@@ -128,2 +224,9 @@ this.socket = this.createSocket(options.security, options.socket); | ||
/** | ||
* Constructs a new socket | ||
* | ||
* @param security The security options, if any | ||
* @param options The socket options, if any | ||
* @returns A new FluentSocket | ||
*/ | ||
private createSocket( | ||
@@ -140,5 +243,37 @@ security?: FluentAuthOptions, | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param data The event to emit (required) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(data: protocol.EventRecord): Promise<void>; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param data The event to emit (required) | ||
* @param timestamp The timestamp of the event (optional) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(data: protocol.EventRecord, timestamp: Timestamp): Promise<void>; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param label The label to emit the data with (optional) | ||
* @param data The event to emit (required) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit(label: string, data: protocol.EventRecord): Promise<void>; | ||
/** | ||
* Emits an event to the Fluent Server | ||
* | ||
* @param label The label to emit the data with (optional) | ||
* @param data The event to emit (required) | ||
* @param timestamp The timestamp of the event (optional) | ||
* @returns A Promise, which resolves when the event is successfully sent to the server. | ||
* Enabling acknowledgements waits until the server indicates they have received the event. | ||
*/ | ||
emit( | ||
@@ -213,9 +348,19 @@ label: string, | ||
if (this.retrier !== null) { | ||
return this.retrier.retryPromise(() => this.sendEvent(tag, time, data)); | ||
return this.retrier.retryPromise(() => this.pushEvent(tag, time, data)); | ||
} else { | ||
return this.sendEvent(tag, time, data); | ||
return this.pushEvent(tag, time, data); | ||
} | ||
} | ||
private sendEvent( | ||
/** | ||
* Pushes an event onto the sendQueue | ||
* | ||
* Also drops items from the queue if it is too large (size/length) | ||
* | ||
* @param tag The event tag | ||
* @param time The event timestamp | ||
* @param data The event data | ||
* @returns The promise from the sendQueue | ||
*/ | ||
private pushEvent( | ||
tag: protocol.Tag, | ||
@@ -240,2 +385,7 @@ time: protocol.Time, | ||
/** | ||
* Called once the underlying socket is writable | ||
* | ||
* Should attempt a flush | ||
*/ | ||
private handleWritable() { | ||
@@ -245,2 +395,5 @@ this.maybeFlush(); | ||
/** | ||
* Connects the client. Happens automatically during construction, but can be called after a `disconnect()` to resume the client. | ||
*/ | ||
public connect() { | ||
@@ -278,2 +431,8 @@ this.socket.connect(); | ||
/** | ||
* Creates a tag from the passed label and the constructor `tagPrefix`. | ||
* | ||
* @param label The label to create a tag from | ||
* @returns The constructed tag, or `null`. | ||
*/ | ||
private makeTag(label: string | null): string | null { | ||
@@ -291,2 +450,8 @@ let tag = null; | ||
/** | ||
* Flushes to the socket internally | ||
* | ||
* Managed by `flush` to not be called multiple times | ||
* @returns true if there are more events in the queue to flush, false otherwise | ||
*/ | ||
private innerFlush(): boolean { | ||
@@ -318,2 +483,7 @@ if (this.sendQueue.queueLength === 0) { | ||
/** | ||
* Flushes the event queue. Queues up the flushes for the next tick, preventing multiple flushes at the same time. | ||
* | ||
* @returns A promise, which resolves with a boolean indicating if there are more events to flush. | ||
*/ | ||
public flush(): Promise<boolean> { | ||
@@ -332,2 +502,10 @@ // Prevent duplicate flushes next tick | ||
/** | ||
* Potentially triggers a flush | ||
* | ||
* If we're flushing on an interval, check if the queue (size/length) limits have been reached, and otherwise schedule a new flush | ||
* | ||
* If not, just flush | ||
* @returns | ||
*/ | ||
private maybeFlush(): void { | ||
@@ -368,2 +546,6 @@ // nothing to flush | ||
/** | ||
* Send the front item of the queue to the socket | ||
* @returns True if there was something to send | ||
*/ | ||
private sendNext(): boolean { | ||
@@ -412,2 +594,10 @@ let chunk: protocol.Chunk | undefined; | ||
/** | ||
* Creates an event for how long to wait for the ack | ||
* | ||
* @param chunkId The chunk ID we're waiting to ack | ||
* @param deferred The deferred to reject on timeout | ||
* @param ackTimeout The timeout length | ||
* @returns | ||
*/ | ||
private setupAckTimeout( | ||
@@ -419,9 +609,16 @@ chunkId: string, | ||
return setTimeout(() => { | ||
// If the chunk isn't in the queue, then we must have removed it somewhere, assume that it didn't time out | ||
if (this.ackQueue.has(chunkId)) { | ||
deferred.reject(new AckTimeoutError("ack response timeout")); | ||
this.ackQueue.delete(chunkId); | ||
} | ||
deferred.reject(new AckTimeoutError("ack response timeout")); | ||
}, ackTimeout); | ||
} | ||
/** | ||
* Called on an acknowledgement from the socket | ||
* | ||
* @param chunkId The chunk ID the socket has acknowledged | ||
* @returns | ||
*/ | ||
private handleAck(chunkId: string): void { | ||
@@ -440,2 +637,6 @@ if (!this.ackQueue.has(chunkId)) { | ||
/** | ||
* Fails all acknowledgements | ||
* Called on shutdown | ||
*/ | ||
private clearAcks(): void { | ||
@@ -442,0 +643,0 @@ for (const data of this.ackQueue.values()) { |
@@ -10,2 +10,5 @@ class BaseError extends Error { | ||
/** | ||
* Thrown on configuration errors, e.g bad event modes | ||
*/ | ||
export class ConfigError extends BaseError { | ||
@@ -17,2 +20,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown when an event is emitted without either passing the tag to `client.emit` or providing a `tag_prefix`. | ||
*/ | ||
export class MissingTagError extends BaseError { | ||
@@ -24,3 +30,6 @@ constructor(message: string) { | ||
export class ResponseError extends BaseError { | ||
/** | ||
* Thrown when a client/server receives an unexpected/invalid message | ||
*/ | ||
export class UnexpectedMessageError extends BaseError { | ||
constructor(message: string) { | ||
@@ -31,2 +40,5 @@ super(message); | ||
/** | ||
* Thrown when a client waiting for an acknowledgement times out | ||
*/ | ||
export class AckTimeoutError extends BaseError { | ||
@@ -38,2 +50,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown when a client tries to emit an invalid data format, e.g string as timestamp, string event, etc. | ||
*/ | ||
export class DataTypeError extends BaseError { | ||
@@ -45,2 +60,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown when a socket times out, but in a weird state. | ||
*/ | ||
export class SocketTimeoutError extends BaseError { | ||
@@ -52,2 +70,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown when a client tries to write to a socket, but the socket is not writable | ||
*/ | ||
export class SocketNotWritableError extends BaseError { | ||
@@ -59,2 +80,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown when an event is dropped for any reason | ||
*/ | ||
export class DroppedError extends BaseError { | ||
@@ -66,2 +90,14 @@ constructor(message: string) { | ||
/** | ||
* Thrown when an event is dropped as a result of clearing out the queue | ||
*/ | ||
export class ClearDroppedError extends DroppedError { | ||
constructor(message: string) { | ||
super(message); | ||
} | ||
} | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
*/ | ||
export class AuthError extends BaseError { | ||
@@ -73,2 +109,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
*/ | ||
export class ShutdownError extends BaseError { | ||
@@ -80,3 +119,6 @@ constructor(message: string) { | ||
export class SharedKeyMismatchError extends BaseError { | ||
/** | ||
* Thrown when the shared key doesn't match | ||
*/ | ||
export class SharedKeyMismatchError extends AuthError { | ||
constructor(message: string) { | ||
@@ -87,2 +129,5 @@ super(message); | ||
/** | ||
* Thrown when a message could not be decoded properly | ||
*/ | ||
export class DecodeError extends BaseError { | ||
@@ -94,2 +139,5 @@ constructor(message: string) { | ||
/** | ||
* Thrown when trying to call `connect()` on a fatal socket | ||
*/ | ||
export class FatalSocketError extends BaseError { | ||
@@ -96,0 +144,0 @@ constructor(message: string) { |
import {DroppedError} from "./error"; | ||
/** | ||
* Event retry settings | ||
* | ||
* The parameters represent an exponential backoff formula: | ||
* min(maxDelay, max(minDelay, backoff^attempts * delay)) | ||
*/ | ||
export type EventRetryOptions = { | ||
/** | ||
* How often we retry each event | ||
* | ||
* Defaults to 4 | ||
*/ | ||
attempts: number; | ||
/** | ||
* The backoff factor for each attempt | ||
* | ||
* Defaults to 2 | ||
*/ | ||
backoff: number; | ||
/** | ||
* The delay factor for each attempt | ||
* | ||
* Defaults to 100 | ||
*/ | ||
delay: number; | ||
/** | ||
* The global minimum delay | ||
*/ | ||
minDelay: number; | ||
/** | ||
* The global maximum delay | ||
*/ | ||
maxDelay: number; | ||
/** | ||
* Called with each error | ||
* | ||
* Can be used for logging, or if the error is non-retryable, this callback can `throw` the error to short circuit the callback. | ||
*/ | ||
onError: (err: Error) => void; | ||
}; | ||
/** | ||
* Provides retry logic for a promise, with failure cases | ||
*/ | ||
export class EventRetrier { | ||
@@ -26,2 +61,9 @@ private options: EventRetryOptions; | ||
/** | ||
* Retry the promise | ||
* | ||
* Attempts the promise in an infinite loop, and retries according to the logic in EventRetryOptions | ||
* @param makePromise An async function to retry | ||
* @returns A Promise which succeeds if the async function succeeds, or has exhausted retry attempts | ||
*/ | ||
public async retryPromise<T>(makePromise: () => Promise<T>): Promise<T> { | ||
@@ -28,0 +70,0 @@ let retryAttempts = 0; |
@@ -0,10 +1,37 @@ | ||
/** | ||
* TS/JS representation of the [Fluentd EventTime](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format) type | ||
*/ | ||
class EventTime { | ||
epoch: number; | ||
nano: number; | ||
/** | ||
* The epoch of this EventTime (seconds since midnight, Jan 1st, 1970) | ||
*/ | ||
public get epoch(): number { | ||
return this._epoch; | ||
} | ||
/** | ||
* The nano part of this EventTime (epoch + nano = timestamp nanos) | ||
*/ | ||
public get nano(): number { | ||
return this._nano; | ||
} | ||
private _epoch: number; | ||
private _nano: number; | ||
/** | ||
* Creates a new EventTime object | ||
* @param epoch The epoch (seconds since midnight, Jan 1st, 1970) | ||
* @param nano The nano part (epoch + nano = timestamp) | ||
*/ | ||
constructor(epoch: number, nano: number) { | ||
this.epoch = epoch; | ||
this.nano = nano; | ||
this._epoch = epoch; | ||
this._nano = nano; | ||
} | ||
/** | ||
* Packs the `EventTime` into a buffer | ||
* @internal | ||
* @param eventTime The `EventTime` object to pack | ||
* @returns The serialized `EventTime` | ||
*/ | ||
static pack(eventTime: EventTime): Buffer { | ||
@@ -17,2 +44,8 @@ const b = Buffer.allocUnsafe(8); | ||
/** | ||
* Unpacks an `EventTime` from a buffer | ||
* @internal | ||
* @param buffer The buffer to read the `EventTime` from | ||
* @returns The deserialized `EventTime`. | ||
*/ | ||
static unpack(buffer: Buffer): EventTime { | ||
@@ -24,2 +57,8 @@ const e = buffer.readUInt32BE(0); | ||
/** | ||
* Returns the current timestamp as an `EventTime` | ||
* | ||
* Similar to `Date.now()` | ||
* @returns The EventTime representation of the current timestamp | ||
*/ | ||
static now(): EventTime { | ||
@@ -30,2 +69,8 @@ const now = Date.now(); | ||
/** | ||
* Converts a `Date` to an `EventTime`. | ||
* | ||
* @param date The `Date` object to convert | ||
* @returns The equivalent `EventTime`. | ||
*/ | ||
static fromDate(date: Date): EventTime { | ||
@@ -36,2 +81,8 @@ const t = date.getTime(); | ||
/** | ||
* Creates a new `EventTime` from a numeric timestamp | ||
* | ||
* @param t The numeric timestamp to convert to an EventTime | ||
* @returns The EventTime representation of the timestamp | ||
*/ | ||
static fromTimestamp(t: number): EventTime { | ||
@@ -38,0 +89,0 @@ const epoch = Math.floor(t / 1000); |
export {FluentClient} from "./client"; | ||
export {FluentServer} from "./server"; | ||
export {default as EventTime} from "./event_time"; | ||
export type {FluentSocketOptions, ReconnectOptions} from "./socket"; | ||
export type {FluentAuthOptions} from "./auth"; | ||
export type {EventRetryOptions} from "./event_retrier"; | ||
export type { | ||
FluentClientOptions, | ||
Timestamp, | ||
AckOptions, | ||
EventModes, | ||
} from "./client"; | ||
export type {FluentServerOptions, FluentServerSecurityOptions} from "./server"; | ||
export * as FluentError from "./error"; |
@@ -8,11 +8,25 @@ import * as pDefer from "p-defer"; | ||
entries: protocol.Entry[]; | ||
size: number; | ||
deferred: pDefer.DeferredPromise<void>; | ||
}; | ||
/** | ||
* Implements the Forward specification's [Forward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode) | ||
*/ | ||
export class ForwardQueue extends Queue { | ||
/** | ||
* Maintain the queue as a Map | ||
* | ||
* JS guarantees maps are insertion ordered, so calling sendQueue.values().next.value will be the first tag to be inserted. | ||
*/ | ||
private sendQueue: Map<protocol.Tag, ForwardRecord> = new Map(); | ||
/** | ||
* The total number of events stored within the queue | ||
* | ||
* Note that this isn't just sendQueue.size because each entry in the map can have multiple events | ||
*/ | ||
private sendQueueLength = 0; | ||
// Size is not measured for this queue | ||
/** | ||
* Size is not measured for this queue | ||
*/ | ||
get queueSize(): number { | ||
@@ -27,3 +41,3 @@ return -1; | ||
public push( | ||
tag: string, | ||
tag: protocol.Tag, | ||
time: protocol.Time, | ||
@@ -37,3 +51,2 @@ data: protocol.EventRecord | ||
entryData.entries.push(entry); | ||
entryData.size += entry.length; | ||
return entryData.deferred.promise; | ||
@@ -45,3 +58,2 @@ } else { | ||
entries: [entry], | ||
size: entry.length, | ||
deferred: deferred, | ||
@@ -48,0 +60,0 @@ }); |
@@ -12,6 +12,16 @@ import * as pDefer from "p-defer"; | ||
/** | ||
* Implements the Forward specification's [Message mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#message-modes) | ||
*/ | ||
export class MessageQueue extends Queue { | ||
/** | ||
* Maintain the queue as a Set | ||
* | ||
* JS guarantees sets are insertion ordered, so calling sendQueue.values().next.value will be the first entry to be inserted. | ||
*/ | ||
private sendQueue: Set<EventRecord> = new Set(); | ||
// Size is not measured for this queue | ||
/** | ||
* Size is not measured for this queue | ||
*/ | ||
get queueSize(): number { | ||
@@ -21,2 +31,5 @@ return -1; | ||
/** | ||
* The length of the queue | ||
*/ | ||
get queueLength(): number { | ||
@@ -27,3 +40,3 @@ return this.sendQueue.size; | ||
public push( | ||
tag: string, | ||
tag: protocol.Tag, | ||
time: protocol.Time, | ||
@@ -42,3 +55,3 @@ event: protocol.EventRecord | ||
public pop(): EventRecord | null { | ||
protected pop(): EventRecord | null { | ||
if (this.sendQueue.size === 0) { | ||
@@ -45,0 +58,0 @@ return null; |
@@ -8,2 +8,7 @@ import * as pDefer from "p-defer"; | ||
entries: Uint8Array[]; | ||
/** | ||
* The total length of the buffers in entries | ||
* | ||
* Useful for concatenating them all together later | ||
*/ | ||
size: number; | ||
@@ -13,6 +18,29 @@ deferred: pDefer.DeferredPromise<void>; | ||
/** | ||
* Implements the Forward specification's [(Compressed)?PackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode) | ||
* | ||
* Implements both in the same queue, because the only code difference is compression in `nextPacket`. | ||
* | ||
* Subclassed below into the correct modes. | ||
*/ | ||
class BasePackedForwardQueue extends Queue { | ||
/** | ||
* Maintain the queue as a Map | ||
* | ||
* JS guarantees maps are insertion ordered, so calling sendQueue.values().next.value will be the first tag to be inserted. | ||
*/ | ||
private sendQueue: Map<protocol.Tag, PackedRecord> = new Map(); | ||
/** | ||
* The total size of the buffers in the queue | ||
*/ | ||
private sendQueueSize = 0; | ||
/** | ||
* The total number of events stored within the queue | ||
* | ||
* Note that this isn't just sendQueue.size because each entry in the map can have multiple events | ||
*/ | ||
private sendQueueLength = 0; | ||
/** | ||
* Used to gate compression of `nextPacket`. The difference between PackedForward and CompressedPackedForward | ||
*/ | ||
protected compressed = false; | ||
@@ -29,3 +57,3 @@ | ||
public push( | ||
tag: string, | ||
tag: protocol.Tag, | ||
time: protocol.Time, | ||
@@ -54,3 +82,3 @@ data: protocol.EventRecord | ||
public pop(): PackedRecord | null { | ||
protected pop(): PackedRecord | null { | ||
if (this.sendQueue.size === 0) { | ||
@@ -95,2 +123,5 @@ return null; | ||
/** | ||
* Implements the Forward specification's [PackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode) | ||
*/ | ||
export class PackedForwardQueue extends BasePackedForwardQueue { | ||
@@ -100,4 +131,7 @@ protected compressed = false; | ||
/** | ||
* Implements the Forward specification's [CompressedPackedForward mode](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#compressedpackedforward-mode) | ||
*/ | ||
export class CompressedPackedForwardQueue extends BasePackedForwardQueue { | ||
protected compressed = true; | ||
} |
import * as protocol from "../protocol"; | ||
import {DeferredPromise} from "p-defer"; | ||
import {DroppedError} from "../error"; | ||
import {ClearDroppedError, DroppedError} from "../error"; | ||
/** | ||
* Every queue must have this type of data | ||
* | ||
* For DropEntry and clear purposes | ||
*/ | ||
export type EntryData = { | ||
/** | ||
* A deferred promise to resolve or reject once the data is sent or dropped | ||
*/ | ||
deferred: DeferredPromise<void>; | ||
}; | ||
/** | ||
* A packet to send to the Fluentd server | ||
*/ | ||
export type PacketData = { | ||
/** | ||
* The data to send | ||
*/ | ||
packet: Uint8Array; | ||
/** | ||
* A deferred promise to resolve once the data is successfully sent | ||
*/ | ||
deferred: DeferredPromise<void>; | ||
}; | ||
/** | ||
* Exposes a send queue for various event modes. | ||
*/ | ||
export abstract class Queue { | ||
/** | ||
* The # of entries in the queue | ||
*/ | ||
public abstract get queueLength(): number; | ||
/** | ||
* The total size of the queue | ||
*/ | ||
public abstract get queueSize(): number; | ||
/** | ||
* Add an event to the queue | ||
* | ||
* @param tag | ||
* @param time | ||
* @param data | ||
* @returns A Promise which is resolved once the data is sent, or rejected if there is an error | ||
*/ | ||
public abstract push( | ||
tag: string, | ||
tag: protocol.Tag, | ||
time: protocol.Time, | ||
data: protocol.EventRecord | ||
): Promise<void>; | ||
/** | ||
* Returns the next packet to send from the queue | ||
* | ||
* @param chunk A Chunk ID to send along, for acknowledgements if enabled | ||
* | ||
*/ | ||
public abstract nextPacket(chunk?: protocol.Chunk): PacketData | null; | ||
public abstract pop(): EntryData | null; | ||
/** | ||
* Returns and removes the first packet from the queue | ||
* | ||
* Used to drop events from the queue | ||
* @returns An entry, or null if the queue is empty | ||
*/ | ||
protected abstract pop(): EntryData | null; | ||
/** | ||
* Drops the item at the front of the queue | ||
* | ||
* Handles rejecting the promises, etc | ||
* @returns void | ||
*/ | ||
public dropEntry(): void { | ||
@@ -31,3 +83,3 @@ const entryData = this.pop(); | ||
entryData.deferred.reject( | ||
new DroppedError("Message was dropped due to size limits") | ||
new DroppedError("Message was dropped due to limits") | ||
); | ||
@@ -37,2 +89,5 @@ } | ||
/** | ||
* Clears out the queue | ||
*/ | ||
public clear(): void { | ||
@@ -42,3 +97,3 @@ let entryData: EntryData | null; | ||
entryData.deferred.reject( | ||
new DroppedError("Message dropped due to queue shutdown") | ||
new ClearDroppedError("Message dropped due to queue shutdown") | ||
); | ||
@@ -45,0 +100,0 @@ } |
@@ -15,2 +15,4 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ | ||
// Types from the [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1) | ||
export type SharedKey = string; | ||
@@ -412,2 +414,14 @@ export type SharedKeyNonce = string | Uint8Array; | ||
/** | ||
* Validates a PING message from the client | ||
* | ||
* Assumes a valid structure (isPing has been called) | ||
* | ||
* @param m The ping message to validate | ||
* @param serverHostname The hostname of the client | ||
* @param serverKeyInfo The key info known to the server | ||
* @param authInfo Authentication information to validate (optional, auth not required if missing) | ||
* @returns An object with the complete SharedKeyInfo | ||
* @throws Error on mismatches | ||
*/ | ||
export const checkPing = ( | ||
@@ -469,2 +483,11 @@ m: PingMessage, | ||
/** | ||
* Checks the PONG message from the server | ||
* | ||
* Assumes a valid structure (isPong has been called) | ||
* @param m The PONG message from the server to validate | ||
* @param clientHostname The client hostname | ||
* @param sharedKeyInfo The client shared key information | ||
* @throws Error on validation issues | ||
*/ | ||
export const checkPong = ( | ||
@@ -569,5 +592,14 @@ m: PongMessage, | ||
entries: Entry[]; | ||
/** | ||
* The chunk from the transport message, if any, used for acks | ||
*/ | ||
chunk?: Chunk; | ||
}; | ||
/** | ||
* Parses a transport message from the client | ||
* | ||
* @param message The transport message to parse | ||
* @returns An object with the decoded entries from the object | ||
*/ | ||
export const parseTransport = ( | ||
@@ -616,2 +648,5 @@ message: ClientTransportMessage | ||
/** | ||
* The [EventTime](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format) ext code is 0 | ||
*/ | ||
const EVENT_TIME_EXT_TYPE = 0x00; | ||
@@ -639,3 +674,15 @@ const extensionCodec = new ExtensionCodec(); | ||
/** | ||
* Creates a newEncoder | ||
* | ||
* We can't share these because we were running into strange bugs where the internal buffers were being overwritten | ||
* @returns A new Encoder to use | ||
*/ | ||
const encoder = (): Encoder => new Encoder(extensionCodec); | ||
/** | ||
* Creates a new Decoder | ||
* | ||
* We can't share these because we were running into strange bugs where the internal buffers were being overwritten | ||
* @returns A new Decoder to use | ||
*/ | ||
const decoder = (): Decoder => new Decoder(extensionCodec); | ||
@@ -669,2 +716,8 @@ | ||
/** | ||
* Decodes a stream of data from the client | ||
* | ||
* @param dataStream A Readable to read the data from | ||
* @returns An iterable of messages from the client, not type checked | ||
*/ | ||
export const decodeClientStream = ( | ||
@@ -676,2 +729,8 @@ dataStream: Readable | ||
/** | ||
* Decodes a stream of data from the server | ||
* | ||
* @param dataStream A Readable to read the data from | ||
* @returns An iterable of messages from the server, not type checked | ||
*/ | ||
export const decodeServerStream = ( | ||
@@ -683,2 +742,9 @@ dataStream: Readable | ||
/** | ||
* Decodes a sequence of entries from a Buffer | ||
* | ||
* Useful for PackedForward|CompressedPackedForward event modes | ||
* @param data The data to unpack | ||
* @returns The entries from the data | ||
*/ | ||
export const decodeEntries = (data: Uint8Array): Entry[] => { | ||
@@ -689,6 +755,4 @@ const entries = Array.from(decoder().decodeMulti(data)); | ||
} else { | ||
throw new DecodeError( | ||
"Received invalid entries " + JSON.stringify(entries) | ||
); | ||
throw new DecodeError("Received invalid entries"); | ||
} | ||
}; |
@@ -6,10 +6,21 @@ import * as net from "net"; | ||
import * as protocol from "./protocol"; | ||
import {ResponseError} from "./error"; | ||
import {UnexpectedMessageError} from "./error"; | ||
/** | ||
* Manages the state of the client | ||
*/ | ||
enum ClientState { | ||
/** | ||
* Can receive events from this client | ||
*/ | ||
ESTABLISHED, | ||
HELO, | ||
/** | ||
* Waiting for a PING from this client | ||
*/ | ||
PING, | ||
} | ||
/** | ||
* The client information record | ||
*/ | ||
type ClientInfo = { | ||
@@ -25,16 +36,61 @@ socket: net.Socket; | ||
type FluentServerSecurityOptions = { | ||
/** | ||
* The server security hardening options | ||
*/ | ||
export type FluentServerSecurityOptions = { | ||
/** | ||
* The hostname of the server. Should be unique to this process | ||
*/ | ||
serverHostname: string; | ||
/** | ||
* The shared key to authenticate clients with | ||
*/ | ||
sharedKey: string; | ||
/** | ||
* Whether to use user authentication | ||
*/ | ||
authorize: boolean; | ||
/** | ||
* A dict of users to their passwords | ||
*/ | ||
userDict: Record<string, string>; | ||
}; | ||
type FluentServerOptions = { | ||
/** | ||
* The server setup options | ||
*/ | ||
export type FluentServerOptions = { | ||
/** | ||
* The security options. | ||
* | ||
* Defaults to undefined (no auth). | ||
*/ | ||
security?: FluentServerSecurityOptions; | ||
/** | ||
* Whether or not to keep the sockets alive. Sent in HELO, but currently ignored | ||
* | ||
* Defaults to false | ||
*/ | ||
keepalive?: boolean; | ||
/** | ||
* TLS setup options. | ||
* | ||
* See the [Node.js docs](https://nodejs.org/api/tls.html#tls_tls_createserver_options_secureconnectionlistener) for more info | ||
* | ||
* Defaults to undefined | ||
*/ | ||
tlsOptions?: tls.TlsOptions; | ||
/** | ||
* Socket listen options | ||
* | ||
* See the [Node.js docs](https://nodejs.org/api/net.html#net_server_listen_options_callback) for more info | ||
* | ||
* Defaults to {port: 0} | ||
*/ | ||
listenOptions?: net.ListenOptions; | ||
}; | ||
/** | ||
* A Fluent [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1) compatible server | ||
*/ | ||
export class FluentServer extends EventEmitter { | ||
@@ -49,2 +105,7 @@ private _port: number | undefined; | ||
/** | ||
* Creates a new server | ||
* | ||
* @param options The server connection options | ||
*/ | ||
constructor(options: FluentServerOptions = {}) { | ||
@@ -67,2 +128,6 @@ super(); | ||
/** | ||
* Handles a connection event on the server | ||
* @param socket | ||
*/ | ||
private async handleConnection(socket: Socket): Promise<void> { | ||
@@ -105,3 +170,3 @@ const clientKey = socket.remoteAddress + ":" + socket.remotePort; | ||
// Unexpected PONG when we didn't send a HELO | ||
throw new ResponseError("Unexpected PING"); | ||
throw new UnexpectedMessageError("Unexpected PING"); | ||
} | ||
@@ -154,3 +219,3 @@ try { | ||
} else { | ||
throw new ResponseError("Unexpected message"); | ||
throw new UnexpectedMessageError("Unexpected message"); | ||
} | ||
@@ -164,10 +229,29 @@ } | ||
onEntry(tag: string, time: protocol.Time, record: protocol.EventRecord) { | ||
/** | ||
* Called for each entry received by the server. | ||
* | ||
* @param tag The tag of the entry | ||
* @param time The timestamp of the entry | ||
* @param record The entry record | ||
*/ | ||
protected onEntry( | ||
tag: protocol.Tag, | ||
time: protocol.Time, | ||
record: protocol.EventRecord | ||
) { | ||
this.emit("entry", tag, time, record); | ||
} | ||
get port() { | ||
/** | ||
* Returns the port the server is currently listening on | ||
*/ | ||
get port(): number | undefined { | ||
return this._port; | ||
} | ||
/** | ||
* Start the server | ||
* | ||
* @returns A Promise which resolves once the server is listening | ||
*/ | ||
listen(): Promise<void> { | ||
@@ -184,2 +268,7 @@ return new Promise(resolve => { | ||
/** | ||
* Shutdown the server | ||
* | ||
* @returns A Promise, which resolves once the server has fully shut down. | ||
*/ | ||
close(): Promise<void> { | ||
@@ -186,0 +275,0 @@ return new Promise(resolve => { |
@@ -6,3 +6,3 @@ import * as net from "net"; | ||
SocketTimeoutError, | ||
ResponseError, | ||
UnexpectedMessageError, | ||
SocketNotWritableError, | ||
@@ -15,6 +15,32 @@ AuthError, | ||
type ReconnectOptions = { | ||
/** | ||
* Reconnection settings for the socket | ||
* | ||
* The parameters represent an exponential backoff formula: | ||
* min(maxDelay, max(minDelay, backoff^attempts * delay)) | ||
* | ||
* The attempt count is incremented each time we fail a connection, | ||
* and set to zero each time a connection is successfully made. | ||
* Note this is before handshaking | ||
*/ | ||
export type ReconnectOptions = { | ||
/** | ||
* The backoff factor for each attempt | ||
* | ||
* Defaults to 2 | ||
*/ | ||
backoff: number; | ||
/** | ||
* The delay factor for each attempt | ||
* | ||
* Defaults to 500 | ||
*/ | ||
delay: number; | ||
/** | ||
* The global minimum delay | ||
*/ | ||
minDelay: number; | ||
/** | ||
* The global maximum delay | ||
*/ | ||
maxDelay: number; | ||
@@ -24,8 +50,41 @@ }; | ||
export type FluentSocketOptions = { | ||
/** | ||
* If connecting to a unix domain socket, e.g unix:///var/run/fluentd.sock, then specify that here. | ||
* | ||
* This overrides host/port below. Defaults to `undefined`. | ||
*/ | ||
path?: string; | ||
/** | ||
* The host (IP) to connect to | ||
* | ||
* Defaults to `localhost`. | ||
*/ | ||
host?: string; | ||
/** | ||
* The port to connect to | ||
* | ||
* Defaults to `24224`. | ||
*/ | ||
port?: number; | ||
/** | ||
* The socket timeout to set. After timing out, the socket will be idle and reconnect once the client wants to write something. | ||
* | ||
* Defaults to 3000 (3 seconds) | ||
*/ | ||
timeout?: number; | ||
/** | ||
* TLS connection options. See [Node docs](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) | ||
* | ||
* If provided, the socket will be a TLS socket. | ||
*/ | ||
tls?: tls.ConnectionOptions; | ||
/** | ||
* Reconnection options. See subtype for defaults. | ||
*/ | ||
reconnect?: Partial<ReconnectOptions>; | ||
/** | ||
* Disable reconnection on failure. This can be useful for one-offs | ||
* | ||
* Defaults to false | ||
*/ | ||
disableReconnect?: boolean; | ||
@@ -35,16 +94,99 @@ }; | ||
enum SocketState { | ||
DISCONNECTED, // No socket, (no read/write) | ||
CONNECTING, // Working on establishing the connection, (no read/write) | ||
CONNECTED, // Connected to the socket, but haven't finished connection negotiation, (internal read/write) | ||
ESTABLISHED, // Connected to the socket (can read/write) | ||
DRAINING, // The socket has blocked writes temporarily (no write, can read) | ||
/** | ||
* In this state, the socket doesn't exist, and we can't read or write from it. | ||
* | ||
* No read | ||
* No write | ||
* Transitions to CONNECTING on call to connect() (potentially from maybeReconnect()) | ||
*/ | ||
DISCONNECTED, | ||
/** | ||
* In this state we're working on making the connection (opening socket + TLS negotiations if any), but haven't finished. | ||
* | ||
* No read | ||
* No write | ||
* Transitions to DISCONNECTED on error | ||
* Transitions to CONNECTED on success | ||
*/ | ||
CONNECTING, | ||
/** | ||
* In this state, we're doing some preparatory work before accepting writes | ||
* | ||
* Internal read | ||
* Internal write | ||
* Transitions to DISCONNECTED on soft error (will reconnect) | ||
* Transitions to DISCONNECTING on close + medium error | ||
* Transitions to FATAL on hard error | ||
*/ | ||
CONNECTED, | ||
/** | ||
* In this state, we're fully open, and able to read and write to the socket | ||
* | ||
* Can read | ||
* Can write | ||
* Transitions to DISCONNECTED on soft error (will reconnect) | ||
* Transitions to DISCONNECTING on close + medium error | ||
* Transitions to FATAL on hard error | ||
* Tansitions to DRAINING when socket.write returns false (buffer full) | ||
* Transitions to IDLE on timeout | ||
*/ | ||
ESTABLISHED, | ||
/** | ||
* In this state, the socket has blocked writes, as the kernel buffer is full. | ||
* | ||
* Can read | ||
* No write | ||
* Transitions to ESTABLISHED on drain event | ||
* Transitions to DISCONNECTED on soft error (will reconnect) | ||
* Transitions to DISCONNECTING on close + medium error | ||
* Transitions to FATAL on hard error | ||
*/ | ||
DRAINING, | ||
/** | ||
* In this state, the socket is being closed, and will not be reconnected, either as the result of user action, or an event. | ||
* | ||
* Can read | ||
* No write | ||
* Transitions to DISCONNECTED on close event | ||
*/ | ||
DISCONNECTING, // The socket is in the process of being closed (no write, can read) | ||
IDLE, // The socket ran into an idle timeout, and will reconnect on the next write request (no read/write) | ||
FATAL, // The socket is misconfigured, and can't be recovered (no read/write) | ||
/** | ||
* In this state, the socket has timed out due to inactivity. It will be reconnected once the user calls `writable()`. | ||
* | ||
* We don't auto reconnect from this state, as the idle timeout indicates low event activity. | ||
* It can also potentially indicate a misconfiguration where the timeout is too low. | ||
* | ||
* No read | ||
* No write | ||
* Transitions to CONNECTING on call to connect() (potentially from writable()) | ||
*/ | ||
IDLE, | ||
/** | ||
* In this state, the socket has run into a fatal error, which it believes there is no point in reconnecting. | ||
* | ||
* This is almost always a configuration misconfiguration, for example the server requires auth, but the client has no auth information. | ||
* | ||
* No read | ||
* No write | ||
* Does not transition | ||
*/ | ||
FATAL, | ||
} | ||
/** | ||
* How to close the socket | ||
*/ | ||
export enum CloseState { | ||
FATAL, // Fatal state, socket is unusable | ||
CLOSE, // | ||
RECONNECT, // Reconnect from this state | ||
/** | ||
* Make the socket unable to reconnect | ||
*/ | ||
FATAL, | ||
/** | ||
* Allow the socket to reconnect manually | ||
*/ | ||
CLOSE, | ||
/** | ||
* Allow the socket to reconnect automatically | ||
*/ | ||
RECONNECT, | ||
} | ||
@@ -87,4 +229,13 @@ | ||
private reconnect: ReconnectOptions; | ||
/** | ||
* Used so we can read from the socket through an AsyncIterable. | ||
* Protects the reader from accidentally closing the socket on errors. | ||
*/ | ||
private passThroughStream: PassThrough | null = null; | ||
/** | ||
* Creates a new socket | ||
* | ||
* @param options The socket connection options | ||
*/ | ||
constructor(options: FluentSocketOptions = {}) { | ||
@@ -116,2 +267,7 @@ super(); | ||
/** | ||
* Connects the socket to the upstream server | ||
* | ||
* @returns void | ||
*/ | ||
public connect(): void { | ||
@@ -149,2 +305,6 @@ if (this.state === SocketState.FATAL) { | ||
/** | ||
* May reconnect the socket | ||
* @returns void | ||
*/ | ||
private maybeReconnect(): void { | ||
@@ -155,3 +315,3 @@ if (!this.reconnectEnabled || this.reconnectTimeoutId !== null) { | ||
if (this.state !== SocketState.DISCONNECTED) { | ||
// Socket is connected or in a fatal state | ||
// Socket is connected or in a fatal state or idle | ||
return; | ||
@@ -174,2 +334,6 @@ } | ||
/** | ||
* Creates a new TLS socket | ||
* @returns A new socket to use for the connection | ||
*/ | ||
private createTlsSocket(): tls.TLSSocket { | ||
@@ -179,2 +343,6 @@ return tls.connect({...this.tlsOptions, ...this.socketParams}); | ||
/** | ||
* Creates a new TCP socket | ||
* @returns A new socket to use for the connection | ||
*/ | ||
private createTcpSocket(): net.Socket { | ||
@@ -184,2 +352,8 @@ return net.createConnection({...this.socketParams, timeout: this.timeout}); | ||
/** | ||
* Returns a new socket | ||
* | ||
* @param onConnect Called once the socket is connected | ||
* @returns | ||
*/ | ||
private createSocket(onConnect: () => void): Duplex { | ||
@@ -197,2 +371,5 @@ if (this.tlsEnabled) { | ||
/** | ||
* Sets up the socket to be opened | ||
*/ | ||
private openSocket(): void { | ||
@@ -214,2 +391,5 @@ this.state = SocketState.CONNECTING; | ||
/** | ||
* Called once the socket is connected | ||
*/ | ||
private handleConnect(): void { | ||
@@ -222,2 +402,8 @@ this.connectAttempts = 0; | ||
/** | ||
* Processes messages from the socket | ||
* | ||
* @param iterable The socket read data stream | ||
* @returns Promise for when parsing completes | ||
*/ | ||
private async processMessages( | ||
@@ -231,6 +417,9 @@ iterable: AsyncIterable<protocol.ServerMessage> | ||
} catch (e) { | ||
this.close(CloseState.FATAL, e); | ||
this.close(CloseState.CLOSE, e); | ||
} | ||
} | ||
/** | ||
* Called from an error event on the socket | ||
*/ | ||
private handleError(error: Error): void { | ||
@@ -240,2 +429,6 @@ this.onError(error); | ||
/** | ||
* Called when the socket times out | ||
* Should suspend the socket (set it to IDLE) | ||
*/ | ||
private handleTimeout(): void { | ||
@@ -253,2 +446,7 @@ if (this.socket !== null && isConnected(this.state)) { | ||
/** | ||
* Called from a "close" event on the socket | ||
* | ||
* Should clean up the state, and potentially trigger a reconnect | ||
*/ | ||
private handleClose(): void { | ||
@@ -281,2 +479,5 @@ if (this.state === SocketState.CONNECTING) { | ||
/** | ||
* Called when the socket has fully drained, and the buffers are free again | ||
*/ | ||
private handleDrain(): void { | ||
@@ -363,3 +564,3 @@ // We may not have noticed that we were draining, or we may have moved to a different state in the mean time | ||
CloseState.CLOSE, | ||
new ResponseError("Received unexpected message") | ||
new UnexpectedMessageError("Received unexpected message") | ||
); | ||
@@ -370,3 +571,3 @@ } | ||
CloseState.CLOSE, | ||
new ResponseError("Received unexpected message") | ||
new UnexpectedMessageError("Received unexpected message") | ||
); | ||
@@ -373,0 +574,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
282063
6658
315
15