@observertc/observer-js
Advanced tools
Comparing version 0.9.13-beta to 0.9.14-beta
@@ -8,3 +8,3 @@ import { Semaphore } from './common/Semaphore'; | ||
export type EvaluatorEvents = { | ||
ready: undefined; | ||
ready: Error | undefined; | ||
}; | ||
@@ -36,2 +36,3 @@ export type EvaluatorConfig = { | ||
on<K extends keyof EvaluatorEvents>(event: K, listener: (data: EvaluatorEvents[K]) => void): this; | ||
once<K extends keyof EvaluatorEvents>(event: K, listener: (data: EvaluatorEvents[K]) => void): this; | ||
off<K extends keyof EvaluatorEvents>(event: K, listener: (data: EvaluatorEvents[K]) => void): this; | ||
@@ -38,0 +39,0 @@ private _emit; |
@@ -62,2 +62,6 @@ "use strict"; | ||
} | ||
once(event, listener) { | ||
this._emitter.once(event, listener); | ||
return this; | ||
} | ||
off(event, listener) { | ||
@@ -144,12 +148,19 @@ this._emitter.removeListener(event, listener); | ||
async _process(callOperationsContext, evaluatorContext) { | ||
await this._callSemaphore.acquire(); | ||
await this._callProcessor.use(callOperationsContext).finally(() => { | ||
this._callSemaphore.release(); | ||
const process = async () => { | ||
await this._callSemaphore.acquire(); | ||
await this._callProcessor.use(callOperationsContext).finally(() => { | ||
this._callSemaphore.release(); | ||
}); | ||
const transactionContext = await (0, TransactionContext_1.createTransactionContext)(evaluatorContext, this._storageProvider, evaluatorContext.observedCalls, evaluatorContext.observedSfus); | ||
await this._transactionProcessor.use(transactionContext); | ||
await this._customProcessor.use(evaluatorContext); | ||
}; | ||
await process().then(() => { | ||
this._emit('ready', undefined); | ||
}).catch(err => { | ||
logger.warn(`Error occurred while evaluating samples`, err); | ||
this._emit('ready', err); | ||
}); | ||
const transactionContext = await (0, TransactionContext_1.createTransactionContext)(evaluatorContext, this._storageProvider, evaluatorContext.observedCalls, evaluatorContext.observedSfus); | ||
await this._transactionProcessor.use(transactionContext); | ||
await this._customProcessor.use(evaluatorContext); | ||
this._emit('ready', undefined); | ||
} | ||
} | ||
exports.Evaluator = Evaluator; |
@@ -50,2 +50,3 @@ import { ObserverReportsEmitter, ObserverSinkProcess, SinkConfig } from './sinks/ObserverSink'; | ||
createSfuSource(config: PartialBy<ObservedSfuSourceConfig, 'serviceId' | 'mediaUnitId' | 'joined'>): ObservedSfuSource; | ||
evaluate(): Promise<void>; | ||
get reports(): ObserverReportsEmitter; | ||
@@ -52,0 +53,0 @@ addEvaluators(...processes: EvaluatorProcess[]): void; |
@@ -126,2 +126,13 @@ "use strict"; | ||
} | ||
async evaluate() { | ||
return new Promise((resolve, reject) => { | ||
this._evaluator.once('ready', err => { | ||
if (err) | ||
reject(err); | ||
else | ||
resolve(); | ||
}); | ||
this._sources.emitSamples(); | ||
}); | ||
} | ||
get reports() { | ||
@@ -128,0 +139,0 @@ return this._sink; |
@@ -23,7 +23,8 @@ import { ObservedCalls } from '../samples/ObservedCalls'; | ||
readonly config: SourcesConfig; | ||
private _clientSources; | ||
private _sfuSources; | ||
private readonly _warnFlags; | ||
private readonly _clientSources; | ||
private readonly _sfuSources; | ||
private readonly _emitter; | ||
private _observedCallsBuilder; | ||
private _observedSfusBuilder; | ||
private _emitter; | ||
private _timer?; | ||
@@ -38,2 +39,3 @@ private _numberOfSamples; | ||
private _incrementSamples; | ||
emitSamples(): void; | ||
private _emitSamples; | ||
@@ -40,0 +42,0 @@ private _resetTimer; |
@@ -12,7 +12,8 @@ "use strict"; | ||
this.config = config; | ||
this._warnFlags = { autoEmitButEmitSamplesInvoked: false }; | ||
this._clientSources = new Map(); | ||
this._sfuSources = new Map(); | ||
this._emitter = new events_1.EventEmitter(); | ||
this._observedCallsBuilder = new ObservedCalls_1.ObservedCallsBuilder(); | ||
this._observedSfusBuilder = new ObservedSfus_1.ObservedSfusBuilder(); | ||
this._emitter = new events_1.EventEmitter(); | ||
this._numberOfSamples = 0; | ||
@@ -129,2 +130,13 @@ } | ||
} | ||
emitSamples() { | ||
if (0 < this.config.maxSamples || 0 < this.config.maxTimeInMs) { | ||
if (!this._warnFlags.autoEmitButEmitSamplesInvoked) { | ||
logger.warn(`Emitting Samples is called explicitly, | ||
but samples are emitted automatically because of configuration | ||
(maxSamples: ${this.config.maxSamples}, maxTImeInMs: ${this.config.maxTimeInMs})`); | ||
this._warnFlags.autoEmitButEmitSamplesInvoked = true; | ||
} | ||
} | ||
this._emitSamples(); | ||
} | ||
_emitSamples() { | ||
@@ -131,0 +143,0 @@ this._resetTimer(); |
{ | ||
"name": "@observertc/observer-js", | ||
"version": "0.9.13-beta", | ||
"version": "0.9.14-beta", | ||
"description": "Server Side NodeJS Library for processing ObserveRTC Samples", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -19,3 +19,3 @@ import { createLogger } from './common/logger'; | ||
export type EvaluatorEvents = { | ||
ready: undefined; | ||
ready: Error | undefined; | ||
}; | ||
@@ -102,2 +102,7 @@ | ||
public once<K extends keyof EvaluatorEvents>(event: K, listener: (data: EvaluatorEvents[K]) => void): this { | ||
this._emitter.once(event, listener); | ||
return this; | ||
} | ||
public off<K extends keyof EvaluatorEvents>(event: K, listener: (data: EvaluatorEvents[K]) => void): this { | ||
@@ -195,19 +200,25 @@ this._emitter.removeListener(event, listener); | ||
): Promise<void> { | ||
await this._callSemaphore.acquire(); | ||
await this._callProcessor.use(callOperationsContext).finally(() => { | ||
this._callSemaphore.release(); | ||
const process = async () => { | ||
await this._callSemaphore.acquire(); | ||
await this._callProcessor.use(callOperationsContext).finally(() => { | ||
this._callSemaphore.release(); | ||
}); | ||
const transactionContext = await createTransactionContext( | ||
evaluatorContext, | ||
this._storageProvider, | ||
evaluatorContext.observedCalls, | ||
evaluatorContext.observedSfus, | ||
); | ||
await this._transactionProcessor.use(transactionContext); | ||
await this._customProcessor.use(evaluatorContext); | ||
}; | ||
await process().then(() => { | ||
this._emit('ready', undefined); | ||
}).catch(err => { | ||
logger.warn(`Error occurred while evaluating samples`, err); | ||
this._emit('ready', err); | ||
}); | ||
const transactionContext = await createTransactionContext( | ||
evaluatorContext, | ||
this._storageProvider, | ||
evaluatorContext.observedCalls, | ||
evaluatorContext.observedSfus, | ||
); | ||
await this._transactionProcessor.use(transactionContext); | ||
await this._customProcessor.use(evaluatorContext); | ||
this._emit('ready', undefined); | ||
} | ||
} |
@@ -176,2 +176,12 @@ import { ObserverReportsEmitter, ObserverSinkProcess, SinkConfig, SinkImpl } from './sinks/ObserverSink'; | ||
public async evaluate() { | ||
return new Promise<void>((resolve, reject) => { | ||
this._evaluator.once('ready', err => { | ||
if (err) reject(err); | ||
else resolve(); | ||
}); | ||
this._sources.emitSamples(); | ||
}); | ||
} | ||
public get reports(): ObserverReportsEmitter { | ||
@@ -178,0 +188,0 @@ return this._sink; |
@@ -31,7 +31,10 @@ import { ClientSample, SfuSample } from '@observertc/sample-schemas-js'; | ||
export class Sources { | ||
private _clientSources = new Map<string, ObservedClientSource>(); | ||
private _sfuSources = new Map<string, ObservedSfuSource>(); | ||
private readonly _warnFlags = { autoEmitButEmitSamplesInvoked: false }; | ||
private readonly _clientSources = new Map<string, ObservedClientSource>(); | ||
private readonly _sfuSources = new Map<string, ObservedSfuSource>(); | ||
private readonly _emitter = new EventEmitter(); | ||
private _observedCallsBuilder = new ObservedCallsBuilder(); | ||
private _observedSfusBuilder = new ObservedSfusBuilder(); | ||
private _emitter = new EventEmitter(); | ||
private _timer?: ReturnType<typeof setTimeout>; | ||
@@ -159,2 +162,15 @@ private _numberOfSamples = 0; | ||
public emitSamples() { | ||
if (0 < this.config.maxSamples || 0 < this.config.maxTimeInMs) { | ||
if (!this._warnFlags.autoEmitButEmitSamplesInvoked) { | ||
logger.warn(`Emitting Samples is called explicitly, | ||
but samples are emitted automatically because of configuration | ||
(maxSamples: ${this.config.maxSamples}, maxTImeInMs: ${this.config.maxTimeInMs})` | ||
); | ||
this._warnFlags.autoEmitButEmitSamplesInvoked = true; | ||
} | ||
} | ||
this._emitSamples(); | ||
} | ||
private _emitSamples() { | ||
@@ -161,0 +177,0 @@ this._resetTimer(); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1010888
20422