@getanthill/datastore
Advanced tools
Comparing version 0.4.13 to 0.4.14
@@ -5,5 +5,7 @@ /// <reference types="node" /> | ||
import type { DatastoreImportFixture, ModelConfig } from '../../typings'; | ||
import type { Telemetry } from '@getanthill/telemetry/typings'; | ||
export declare const ERROR_MISSING_MODEL_NAME: Error; | ||
export declare const ERROR_MISSING_CORRELATION_ID: Error; | ||
export declare const ERROR_MISSING_JSON_PATCH: Error; | ||
export declare const ERROR_STREAM_MAX_RECONNECTION_ATTEMPTS_REACHED: Error; | ||
export interface DatastoreConfig { | ||
@@ -19,3 +21,3 @@ baseUrl?: string; | ||
axios: AxiosInstance; | ||
telemetry: null; | ||
telemetry: Telemetry; | ||
CancelToken: import("axios").CancelTokenStatic; | ||
@@ -84,2 +86,3 @@ streams: Map<string, Function>; | ||
reconnectionInterval?: number; | ||
reconnectionMaxAttempts?: number; | ||
}): Promise<Function>; | ||
@@ -86,0 +89,0 @@ /** |
@@ -15,5 +15,6 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.ERROR_MISSING_JSON_PATCH = exports.ERROR_MISSING_CORRELATION_ID = exports.ERROR_MISSING_MODEL_NAME = void 0; | ||
exports.ERROR_STREAM_MAX_RECONNECTION_ATTEMPTS_REACHED = exports.ERROR_MISSING_JSON_PATCH = exports.ERROR_MISSING_CORRELATION_ID = exports.ERROR_MISSING_MODEL_NAME = void 0; | ||
const util_1 = __importDefault(require("util")); | ||
const events_1 = require("events"); | ||
const telemetry_1 = require("@getanthill/telemetry"); | ||
const lodash_1 = require("lodash"); | ||
@@ -24,2 +25,3 @@ const axios_1 = __importDefault(require("axios")); | ||
exports.ERROR_MISSING_JSON_PATCH = new Error('Missing JSON Patch'); | ||
exports.ERROR_STREAM_MAX_RECONNECTION_ATTEMPTS_REACHED = new Error('Max reconnection attempts reached for streaming'); | ||
function mergeWithArrays(objValue, srcValue) { | ||
@@ -55,2 +57,3 @@ if (Array.isArray(objValue)) { | ||
this.axios = null; | ||
this.telemetry = telemetry_1.telemetry; | ||
this.CancelToken = axios_1.default.CancelToken; | ||
@@ -395,2 +398,3 @@ this.streams = new Map(); | ||
if (this.streams.has(streamId)) { | ||
this.telemetry.logger.debug('[Datastore] Stream already registered'); | ||
return Promise.resolve(this.streams.get(streamId)); | ||
@@ -420,2 +424,5 @@ } | ||
} | ||
this.telemetry.logger.info('[Datastore#closeAll] All streams closed', { | ||
url: this.config.baseUrl, | ||
}); | ||
} | ||
@@ -431,5 +438,14 @@ /** | ||
reconnectionInterval: 100, | ||
reconnectionMaxAttempts: 1000, | ||
}) { | ||
var _a; | ||
const sourceRequest = this.CancelToken.source(); | ||
this.telemetry.logger.info('[Datastore#stream] Initialization', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
data, | ||
options, | ||
}); | ||
let reconnectionAttemptsCount = 0; | ||
let sourceRequest = null; | ||
let mustReconnect = (_a = options.reconnect) !== null && _a !== void 0 ? _a : true; | ||
@@ -439,3 +455,17 @@ let res; | ||
const reconnect = (err) => { | ||
if (!!err) { | ||
this.telemetry.logger.warn('[Datastore#stream] Reconnecting after error', { | ||
err, | ||
}); | ||
} | ||
reconnectionAttemptsCount += 1; | ||
if (reconnectionAttemptsCount > options.reconnectionMaxAttempts) { | ||
return close(exports.ERROR_STREAM_MAX_RECONNECTION_ATTEMPTS_REACHED); | ||
} | ||
if (mustReconnect !== false) { | ||
this.telemetry.logger.info('[Datastore#stream] Reconnecting', { | ||
url: this.config.baseUrl, | ||
reconnection_attempts_count: reconnectionAttemptsCount, | ||
reconnection_max_attempts: options.reconnectionMaxAttempts, | ||
}); | ||
clearTimeout(reconnectionTimeout); | ||
@@ -445,9 +475,40 @@ reconnectionTimeout = setTimeout(connect, options.reconnectionInterval || 100); | ||
else { | ||
this.telemetry.logger.info('[Datastore#stream] Closing stream', { | ||
url: this.config.baseUrl, | ||
must_reconnect: mustReconnect, | ||
}); | ||
close(err); | ||
} | ||
}; | ||
const onData = (chunk) => { | ||
const data = Buffer.from(chunk).toString(); | ||
if (data === ']') { | ||
this.telemetry.logger.debug('[Datastore#stream] Receiving stream ending message', { | ||
data, | ||
}); | ||
close(); | ||
} | ||
if (data === '[' || data === ']' || data === ',') { | ||
this.telemetry.logger.debug('[Datastore#stream] Receiving unique special character message (skipping)', { | ||
data, | ||
}); | ||
return; | ||
} | ||
handler(data); | ||
}; | ||
const close = (err) => { | ||
mustReconnect = false; | ||
sourceRequest.cancel(); | ||
clean(); | ||
this.telemetry.logger.info('[Datastore#stream] Stream closed', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
}); | ||
if (err) { | ||
this.telemetry.logger.warn('[Datastore#stream] Stream closed after error', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
err, | ||
}); | ||
throw err; | ||
@@ -457,4 +518,18 @@ } | ||
}; | ||
const clean = () => { | ||
if (!!sourceRequest) { | ||
sourceRequest.cancel(); | ||
} | ||
if (res) { | ||
res.data.removeListener('error', reconnect); | ||
res.data.removeListener('close', reconnect); | ||
res.data.removeListener('data', onData); | ||
res = null; | ||
} | ||
sourceRequest = this.CancelToken.source(); | ||
}; | ||
const connect = () => __awaiter(this, void 0, void 0, function* () { | ||
try { | ||
clean(); | ||
clearTimeout(reconnectionTimeout); | ||
res = yield this.axios.request({ | ||
@@ -468,14 +543,10 @@ method: 'post', | ||
}); | ||
clearTimeout(reconnectionTimeout); | ||
reconnectionAttemptsCount = 0; | ||
res.data.on('error', reconnect); | ||
res.data.on('close', reconnect); | ||
res.data.on('data', (chunk) => { | ||
const data = Buffer.from(chunk).toString(); | ||
if (data === ']') { | ||
close(); | ||
} | ||
if (data === '[' || data === ']' || data === ',') { | ||
return; | ||
} | ||
handler(data); | ||
res.data.on('data', onData); | ||
this.telemetry.logger.info('[Datastore#stream] Connected', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
}); | ||
@@ -482,0 +553,0 @@ } |
{ | ||
"name": "@getanthill/datastore", | ||
"description": "Event-Sourced Datastore", | ||
"version": "0.4.13", | ||
"version": "0.4.14", | ||
"main": "dist/src/index.js", | ||
@@ -6,0 +6,0 @@ "types": "dist/src/index.d.ts", |
// This file is generated by Sapper — do not edit it! | ||
export const timestamp = 1629801517519; | ||
export const timestamp = 1629811552179; | ||
@@ -4,0 +4,0 @@ export const files = [ |
@@ -305,3 +305,4 @@ import util from 'util'; | ||
expect(client.CancelToken.source).toHaveBeenCalledTimes(1); | ||
// Reconnect + close = 2 | ||
expect(client.CancelToken.source).toHaveBeenCalledTimes(2); | ||
expect(cancel).toHaveBeenCalledTimes(1); | ||
@@ -308,0 +309,0 @@ }); |
import util from 'util'; | ||
import { EventEmitter } from 'events'; | ||
import { telemetry } from '@getanthill/telemetry'; | ||
@@ -8,2 +9,3 @@ import { omit, mapValues, isObject, mergeWith } from 'lodash'; | ||
import type { DatastoreImportFixture, ModelConfig } from '../../typings'; | ||
import type { Telemetry } from '@getanthill/telemetry/typings'; | ||
@@ -13,2 +15,5 @@ export const ERROR_MISSING_MODEL_NAME = new Error('Missing Model name'); | ||
export const ERROR_MISSING_JSON_PATCH = new Error('Missing JSON Patch'); | ||
export const ERROR_STREAM_MAX_RECONNECTION_ATTEMPTS_REACHED = new Error( | ||
'Max reconnection attempts reached for streaming', | ||
); | ||
@@ -52,3 +57,3 @@ function mergeWithArrays(objValue, srcValue) { | ||
axios: AxiosInstance = null; | ||
telemetry: null; | ||
telemetry: Telemetry = telemetry; | ||
CancelToken = axios.CancelToken; | ||
@@ -603,2 +608,3 @@ | ||
if (this.streams.has(streamId)) { | ||
this.telemetry.logger.debug('[Datastore] Stream already registered'); | ||
return Promise.resolve(this.streams.get(streamId)); | ||
@@ -640,2 +646,6 @@ } | ||
} | ||
this.telemetry.logger.info('[Datastore#closeAll] All streams closed', { | ||
url: this.config.baseUrl, | ||
}); | ||
} | ||
@@ -657,2 +667,3 @@ | ||
reconnectionInterval?: number; | ||
reconnectionMaxAttempts?: number; | ||
} = { | ||
@@ -662,5 +673,14 @@ output: 'entity', | ||
reconnectionInterval: 100, | ||
reconnectionMaxAttempts: 1000, | ||
}, | ||
): Promise<Function> { | ||
const sourceRequest = this.CancelToken.source(); | ||
this.telemetry.logger.info('[Datastore#stream] Initialization', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
data, | ||
options, | ||
}); | ||
let reconnectionAttemptsCount: number = 0; | ||
let sourceRequest = null; | ||
let mustReconnect = options.reconnect ?? true; | ||
@@ -671,3 +691,24 @@ | ||
const reconnect = (err?) => { | ||
if (!!err) { | ||
this.telemetry.logger.warn( | ||
'[Datastore#stream] Reconnecting after error', | ||
{ | ||
err, | ||
}, | ||
); | ||
} | ||
reconnectionAttemptsCount += 1; | ||
if (reconnectionAttemptsCount > options.reconnectionMaxAttempts) { | ||
return close(ERROR_STREAM_MAX_RECONNECTION_ATTEMPTS_REACHED); | ||
} | ||
if (mustReconnect !== false) { | ||
this.telemetry.logger.info('[Datastore#stream] Reconnecting', { | ||
url: this.config.baseUrl, | ||
reconnection_attempts_count: reconnectionAttemptsCount, | ||
reconnection_max_attempts: options.reconnectionMaxAttempts, | ||
}); | ||
clearTimeout(reconnectionTimeout); | ||
@@ -679,2 +720,6 @@ reconnectionTimeout = setTimeout( | ||
} else { | ||
this.telemetry.logger.info('[Datastore#stream] Closing stream', { | ||
url: this.config.baseUrl, | ||
must_reconnect: mustReconnect, | ||
}); | ||
close(err); | ||
@@ -684,7 +729,47 @@ } | ||
const onData = (chunk) => { | ||
const data = Buffer.from(chunk).toString(); | ||
if (data === ']') { | ||
this.telemetry.logger.debug( | ||
'[Datastore#stream] Receiving stream ending message', | ||
{ | ||
data, | ||
}, | ||
); | ||
close(); | ||
} | ||
if (data === '[' || data === ']' || data === ',') { | ||
this.telemetry.logger.debug( | ||
'[Datastore#stream] Receiving unique special character message (skipping)', | ||
{ | ||
data, | ||
}, | ||
); | ||
return; | ||
} | ||
handler(data); | ||
}; | ||
const close = (err?) => { | ||
mustReconnect = false; | ||
sourceRequest.cancel(); | ||
clean(); | ||
this.telemetry.logger.info('[Datastore#stream] Stream closed', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
}); | ||
if (err) { | ||
this.telemetry.logger.warn( | ||
'[Datastore#stream] Stream closed after error', | ||
{ | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
err, | ||
}, | ||
); | ||
throw err; | ||
@@ -696,4 +781,22 @@ } | ||
const clean = () => { | ||
if (!!sourceRequest) { | ||
sourceRequest.cancel(); | ||
} | ||
if (res) { | ||
res.data.removeListener('error', reconnect); | ||
res.data.removeListener('close', reconnect); | ||
res.data.removeListener('data', onData); | ||
res = null; | ||
} | ||
sourceRequest = this.CancelToken.source(); | ||
}; | ||
const connect = async () => { | ||
try { | ||
clean(); | ||
clearTimeout(reconnectionTimeout); | ||
res = await this.axios.request({ | ||
@@ -708,19 +811,12 @@ method: 'post', | ||
clearTimeout(reconnectionTimeout); | ||
reconnectionAttemptsCount = 0; | ||
res.data.on('error', reconnect); | ||
res.data.on('close', reconnect); | ||
res.data.on('data', onData); | ||
res.data.on('data', (chunk) => { | ||
const data = Buffer.from(chunk).toString(); | ||
if (data === ']') { | ||
close(); | ||
} | ||
if (data === '[' || data === ']' || data === ',') { | ||
return; | ||
} | ||
handler(data); | ||
this.telemetry.logger.info('[Datastore#stream] Connected', { | ||
url: this.config.baseUrl, | ||
model, | ||
source, | ||
}); | ||
@@ -727,0 +823,0 @@ } catch (err) { |
Sorry, the diff of this file is not supported yet
1232512
23421