🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more

datastreamer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

datastreamer

Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).

1.2.8
latest
Version published
Maintainers
1
Created

DataStreamer

Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).

Installation

DataStreamer at NPM (version 1.2.7)

Install with NPM -> npm install datastreamer

Details of DataStreamer

DataStreamer(configName,lineListener,pauseListener,resumeListener,streamListener,extraFields);

Parameters

  • configName: Name or path of config file in JSON format without extension (if your config file 'config.json', configName will be 'config')

  • lineListener: Callback function which will be triggered when every line readed from file. This function takes these parameters:

    • fileStream: File stream for resume, pause and close the stream.

    • fieldNames: Field names of data. You can add extra fields via give field names as array to extraFields parameter of DataStreamer constructor.

    • fieldValues: Field values of data. You can add values of extra fields to this array

    • jsonGenerator: Json generator for corresponding data schema which has given with config file's dataSchema attribute.

    function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) { /* your implementation */ }
  • pauseListener: Callback function which will be triggered when stream paused.
    function pauseListener() { /* your implementation */ }
  • resumeListener: Callback function which will be triggered when stream resumed.
    function resumeListener() { /* your implementation */ }
  • streamListener: Callback function which will be triggered when data streamed to Kafka. These function takes kafkaBuffer and fileStream as parameters:

    • kafkaBuffer: File stream writes data to this buffer to send data to Kafka and DataStreamer reads this buffer and sends to Kafka.
    • fileStream: File stream for resume, pause and close the stream.
    function streamListener(kafkaBuffer,fileStream) { /* your implementation */ }

Usage of DataStreamer

var DataStreamer = require('datastreamer');
var begin = Date.now();

var vars = {
    "queue": [],
    "timestamp": begin,
    "tx_id": 1
};

var dataStreamer = new DataStreamer("paysim-config", // config file name without file extension (.json mandatory)
                                    lineListener,
                                    null,
                                    null,
                                    null,
                                    ["timestamp","tx_id"]);
                                    
function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) {
    fieldValues.push(vars["timestamp"] + vars["tx_id"]);
    fieldValues.push(vars["tx_id"]);

    var now = Date.now();
    var timestamp = vars["timestamp"] + vars["tx_id"];

    if (timestamp < now) { 
       dataStreamer.pushToKafka(jsonGenerator.generateJSON(fieldNames, fieldValues));
    } else {
        vars["queue"].push({ "timestamp": timestamp, "data": jsonGenerator.generateJSON(fieldNames,fieldValues) });
    }

    ++vars["tx_id"];
}

function checkSendingTime() {
    var currentTime = Date.now();

    for (var i = 0; i >= 0 && i < vars["queue"].length; ++i) {
        if (vars["queue"][i]["timestamp"] <= currentTime) {
            dataStreamer.pushToKafka(vars["queue"][i]["data"]);
            vars["queue"].splice(i,1);
            --i;
        }
    }

    setTimeout(checkSendingTime,1);
}

dataStreamer.startStream();
checkSendingTime();

Configuration of DataStreamer

Config for Kafka target

{
  "filename": "nyc-fraud.json",
  "dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
  "chunkSize": 1000,
  "triggerInterval": 200,
  "loggerEnabled": false,
  "target": {
    "type": "kafka",
    "config": {
	    "topic": "nyc-fraud.poc",
	    "connectionString": "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181",
	    "clientId": "nyc-fraud",
	    "zkOptions": {
	      "sessionTimeout": 30000,
	      "spinDelay": 1000,
	      "retries": 10
	    }
	}
  }
}

Config for HTTP target

{
  "filename": "nyc-fraud.json",
  "dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
  "chunkSize": 1000,
  "triggerInterval": 200,
  "loggerEnabled": false,
  "target": {
    "type": "http",
    "config": {
      "hostname": "127.0.0.1",
      "port": 12345,
      "method": "POST",
      "path": "/nyc-fraud",
      "headers": {
        "contentType": "application/json"
      }
    }   
  }
}

Configuration attributes:

  • filename: Name or path of the data

  • dataSchema: Data types of data's columns

  • target: Configuration of stream target

  • chunkSize: Number of datas will be written in triggerInterval

  • triggerInterval: Period of writing data

  • loggerEnabled: Logging flag for log to file or console

  • loggerType: Type of logger which can be file or console

  • logFilename: If logger type is file, logFilename will be used by logger to create a log file with name logFilename.

FAQs

Package last updated on 09 Jun 2018

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts