DataStreamer
Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).
Installation

DataStreamer at NPM (version 1.2.7)
Install with NPM -> npm install datastreamer
Details of DataStreamer
DataStreamer(configName,lineListener,pauseListener,resumeListener,streamListener,extraFields);
Parameters
-
configName: Name or path of config file in JSON format without extension (if your config file 'config.json',
configName will be 'config')
-
lineListener: Callback function which will be triggered when every line readed from file. This function takes these parameters:
-
fileStream: File stream for resume, pause and close the stream.
-
fieldNames: Field names of data. You can add extra fields via give field names as array to extraFields
parameter of DataStreamer constructor.
-
fieldValues: Field values of data. You can add values of extra fields to this array
-
jsonGenerator: Json generator for corresponding data schema which has given with config file's dataSchema
attribute.
function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) { }
- pauseListener: Callback function which will be triggered when stream paused.
function pauseListener() { }
- resumeListener: Callback function which will be triggered when stream resumed.
function resumeListener() { }
function streamListener(kafkaBuffer,fileStream) { }
Usage of DataStreamer
var DataStreamer = require('datastreamer');
var begin = Date.now();
var vars = {
"queue": [],
"timestamp": begin,
"tx_id": 1
};
var dataStreamer = new DataStreamer("paysim-config",
lineListener,
null,
null,
null,
["timestamp","tx_id"]);
function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) {
fieldValues.push(vars["timestamp"] + vars["tx_id"]);
fieldValues.push(vars["tx_id"]);
var now = Date.now();
var timestamp = vars["timestamp"] + vars["tx_id"];
if (timestamp < now) {
dataStreamer.pushToKafka(jsonGenerator.generateJSON(fieldNames, fieldValues));
} else {
vars["queue"].push({ "timestamp": timestamp, "data": jsonGenerator.generateJSON(fieldNames,fieldValues) });
}
++vars["tx_id"];
}
function checkSendingTime() {
var currentTime = Date.now();
for (var i = 0; i >= 0 && i < vars["queue"].length; ++i) {
if (vars["queue"][i]["timestamp"] <= currentTime) {
dataStreamer.pushToKafka(vars["queue"][i]["data"]);
vars["queue"].splice(i,1);
--i;
}
}
setTimeout(checkSendingTime,1);
}
dataStreamer.startStream();
checkSendingTime();
Configuration of DataStreamer
Config for Kafka target
{
"filename": "nyc-fraud.json",
"dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
"chunkSize": 1000,
"triggerInterval": 200,
"loggerEnabled": false,
"target": {
"type": "kafka",
"config": {
"topic": "nyc-fraud.poc",
"connectionString": "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181",
"clientId": "nyc-fraud",
"zkOptions": {
"sessionTimeout": 30000,
"spinDelay": 1000,
"retries": 10
}
}
}
}
Config for HTTP target
{
"filename": "nyc-fraud.json",
"dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
"chunkSize": 1000,
"triggerInterval": 200,
"loggerEnabled": false,
"target": {
"type": "http",
"config": {
"hostname": "127.0.0.1",
"port": 12345,
"method": "POST",
"path": "/nyc-fraud",
"headers": {
"contentType": "application/json"
}
}
}
}
Configuration attributes:
-
filename: Name or path of the data
-
dataSchema: Data types of data's columns
-
target: Configuration of stream target
-
chunkSize: Number of datas will be written in triggerInterval
-
triggerInterval: Period of writing data
-
loggerEnabled: Logging flag for log to file or console
-
loggerType: Type of logger which can be file or console
-
logFilename: If logger type is file, logFilename will be used by logger to create a log file with name logFilename.