
Product
Socket Now Supports pylock.toml Files
Socket now supports pylock.toml, enabling secure, reproducible Python builds with advanced scanning and full alignment with PEP 751's new standard.
datastreamer
Advanced tools
Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).
Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).
DataStreamer at NPM (version 1.2.7)
Install with NPM -> npm install datastreamer
DataStreamer(configName,lineListener,pauseListener,resumeListener,streamListener,extraFields);
configName: Name or path of config file in JSON format without extension (if your config file 'config.json', configName will be 'config')
lineListener: Callback function which will be triggered when every line readed from file. This function takes these parameters:
fileStream: File stream for resume, pause and close the stream.
fieldNames: Field names of data. You can add extra fields via give field names as array to extraFields parameter of DataStreamer constructor.
fieldValues: Field values of data. You can add values of extra fields to this array
jsonGenerator: Json generator for corresponding data schema which has given with config file's dataSchema attribute.
function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) { /* your implementation */ }
function pauseListener() { /* your implementation */ }
function resumeListener() { /* your implementation */ }
streamListener: Callback function which will be triggered when data streamed to Kafka. These function takes kafkaBuffer and fileStream as parameters:
function streamListener(kafkaBuffer,fileStream) { /* your implementation */ }
var DataStreamer = require('datastreamer');
var begin = Date.now();
var vars = {
"queue": [],
"timestamp": begin,
"tx_id": 1
};
var dataStreamer = new DataStreamer("paysim-config", // config file name without file extension (.json mandatory)
lineListener,
null,
null,
null,
["timestamp","tx_id"]);
function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) {
fieldValues.push(vars["timestamp"] + vars["tx_id"]);
fieldValues.push(vars["tx_id"]);
var now = Date.now();
var timestamp = vars["timestamp"] + vars["tx_id"];
if (timestamp < now) {
dataStreamer.pushToKafka(jsonGenerator.generateJSON(fieldNames, fieldValues));
} else {
vars["queue"].push({ "timestamp": timestamp, "data": jsonGenerator.generateJSON(fieldNames,fieldValues) });
}
++vars["tx_id"];
}
function checkSendingTime() {
var currentTime = Date.now();
for (var i = 0; i >= 0 && i < vars["queue"].length; ++i) {
if (vars["queue"][i]["timestamp"] <= currentTime) {
dataStreamer.pushToKafka(vars["queue"][i]["data"]);
vars["queue"].splice(i,1);
--i;
}
}
setTimeout(checkSendingTime,1);
}
dataStreamer.startStream();
checkSendingTime();
{
"filename": "nyc-fraud.json",
"dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
"chunkSize": 1000,
"triggerInterval": 200,
"loggerEnabled": false,
"target": {
"type": "kafka",
"config": {
"topic": "nyc-fraud.poc",
"connectionString": "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181",
"clientId": "nyc-fraud",
"zkOptions": {
"sessionTimeout": 30000,
"spinDelay": 1000,
"retries": 10
}
}
}
}
{
"filename": "nyc-fraud.json",
"dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
"chunkSize": 1000,
"triggerInterval": 200,
"loggerEnabled": false,
"target": {
"type": "http",
"config": {
"hostname": "127.0.0.1",
"port": 12345,
"method": "POST",
"path": "/nyc-fraud",
"headers": {
"contentType": "application/json"
}
}
}
}
filename: Name or path of the data
dataSchema: Data types of data's columns
target: Configuration of stream target
chunkSize: Number of datas will be written in triggerInterval
triggerInterval: Period of writing data
loggerEnabled: Logging flag for log to file or console
loggerType: Type of logger which can be file or console
logFilename: If logger type is file, logFilename will be used by logger to create a log file with name logFilename.
FAQs
Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).
The npm package datastreamer receives a total of 7 weekly downloads. As such, datastreamer popularity was classified as not popular.
We found that datastreamer demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports pylock.toml, enabling secure, reproducible Python builds with advanced scanning and full alignment with PEP 751's new standard.
Security News
Research
Socket uncovered two npm packages that register hidden HTTP endpoints to delete all files on command.
Research
Security News
Malicious Ruby gems typosquat Fastlane plugins to steal Telegram bot tokens, messages, and files, exploiting demand after Vietnam’s Telegram ban.