data-emitter-base
Contains a set of base classes and interfaces to minimize
boilerplate code when integrating data sources. Used as a core library and only uses node modules with the exception of devDependencies,
will not add any dependencies.
IDataEmitter
This is the primary interface that all emitters implement.
Usage
Subscribe to events
import {IDataEmitter} from '@curium.rocks/data-emitter-base';
const emitter: IDataEmitter = new ADataEmitterImplementation();
const onDataListener = emitter.onData({
onData: (evt) => {
if(evt.data instanceof string){
console.log(`data: ${evt.data}`)
}
}
})
const onStatusListener = emitter.onStatus({
onStatus: (evt) => {
if(evt.bit) {
console.log("emitter is in a failed state");
} else {
console.log("emitter is in a healthy state");
}
if(evt.connected) {
console.log("emitter is connected to it's data source");
} else {
console.log("emitter is disconnected from it's data source");
}
}
})
onDataListener.dispose();
onStatusListener.dispose();
Using RxJS
import {IDataEmitter} from '@curium.rocks/data-emitter-base';
import {fromEvent} from 'rxjs';
const emitter: IDataEmitter = new ADataEmitterImplementation();
const rxDataObservable = fromEvent(emitter, 'data');
const rxStatusObservable = fromEvent(emitter, 'status');
const dataSub = rxDataObservable.subscribe((evt) => {
if(evt.data instanceof string){
console.log(`data: ${evt.data}`)
}
})
const statusSub = rxStatusObservable.subscribe((evt) => {
if(evt.bit) {
console.log("emitter is in a failed state");
} else {
console.log("emitter is in a healthy state");
}
if(evt.connected) {
console.log("emitter is connected to it's data source");
} else {
console.log("emitter is disconnected from it's data source");
}
})
dataSub.unsubscribe();
statusSub.unsubscribe();
On demand fetch
import {IDataEmitter} from '@curium.rocks/data-emitter-base';
import {IDataEvent, IStatusEvent} from "./dataEmitter";
const emitter: IDataEmitter = new ADataEmitterImplementation();
const latestData: IDataEvent = await emitter.probeCurrentData();
const latestStatus: IStatusEvent = await emitter.probeStatus();
if (latestData.data instanceof string) {
console.log(`data: ${latestData.data}`)
}
if(latestStatus.bit) {
console.log("emitter in failed state");
} else {
console.log("emitter in healthy state");
}
if(latestStatus.connected) {
console.log("emitter connected");
} else {
console.log("emitter disconnected");
}
BaseEmitter
The BaseEmitter class provides an optional implementation
of the generic portions of the IDataEmitter class to reduce
repetitive code across emitters.
Usage
Extending to wrap event emitter
import {
BaseEmitter,
ICommand,
IDataEvent,
IExecutionResult,
ISettings,
IStatusEvent,
ITraceableAction
} from "@curium.rocks/data-emitter-base";
class SignalEmitter extends BaseEmitter {
private lastDataEvent?: IDataEvent;
constructor(id: string, name: string, desc: string) {
super(id, name, desc);
process.on("SIGINT", ()=>{
this.lastDataEvent = this.buildDataEvent("SIGINT");
this.notifyDataListeners(this.lastDataEvent)
});
}
applySettings(settings: ISettings & ITraceableAction): Promise<IExecutionResult> {
return Promise.reject(new Error("Not Implemented"));
}
probeCurrentData(): Promise<IDataEvent> {
if(!this.lastDataEvent) return Promise.reject(new Error("data unavailable"));
return Promise.resolve(this.lastDataEvent);
}
probeStatus(): Promise<IStatusEvent> {
return Promise.reject(new Error("Not implemented"));
}
sendCommand(command: ICommand): Promise<IExecutionResult> {
return Promise.reject(new Error("Not Implemented"));
}
getMetaData(): unknown {
throw new Error("Not Implemented");
}
}
PollingEmitter
The PollingEmitter provides a common point for all emitters
that require timed polling to fetch data.
Usage
Extending to watch a file
import {ICommand, IExecutionResult, PollingEmitter} from "@curium.rocks/data-emitter-base"
import fs from 'fs';
class FilePollingEmitter extends PollingEmitter {
poll(): Promise<unknown> {
return new Promise((resolve, reject)=>{
fs.readFile('./test.txt', 'utf8' , (err, data) => {
if (err) {
return reject(err);
}
resolve(data);
})
});
}
sendCommand(command: ICommand): Promise<IExecutionResult> {
return Promise.resolve({
success: true,
actionId: command.actionId
})
}
getMetaData(): unknown {
return {
example: "example-val"
}
}
}
Integrations
Check here for more integrations that implement these interfaces.
Creating A New Emitter
If you are interested in creating a new emitter you can use this template repository.