New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

elasticsearch-stream-index

Package Overview
Dependencies
Maintainers
1
Versions
2
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

elasticsearch-stream-index - npm Package Compare versions

Comparing version

to
1.0.2

16

index.js

@@ -10,3 +10,3 @@ // node modules

try{
debug = require('debug')('StreamingElasticsearch:BulkStream');
debug = require('debug')('elasticsearch-stream-index:BulkStream');
} catch (err){}

@@ -18,5 +18,5 @@

}
Writable.call(this, { objectMode: true });
this.highWaterMark = (options && options.highWaterMark) || 64;
Writable.call(this, { objectMode: true, highWaterMark: this.highWaterMark });
this.es_client = es_client;
this.highWaterMark = (options && options.highWaterMark) || 64;
this.bulkCount = 0;

@@ -54,3 +54,3 @@ this.arr = [];

});
debug('_bulk bufArray', bufArray.length);
// debug('_bulk bufArray', bufArray.length);
// bulk insert

@@ -63,5 +63,13 @@ this.es_client.bulk({ body: bufArray }, function(err, resp){

debug('_bulk resp', resp);
// add fail handle
if (resp.errors === true) {
debug('_bulk error', resp.items);
self.emit('fail', resp.items);
}
self.bulkCount = 0;
self.nextChunkIsParams = false;
self.arr = [];
callback();

@@ -68,0 +76,0 @@ });

{
"name": "elasticsearch-stream-index",
"version": "1.0.1",
"version": "1.0.2",
"description": "a writable stream wrapped elasticsearch index operation to bulk",
"main": "index.js",
"scripts": {
"test": "node test/test.js"
"test": "DEBUG=* node test/test.js"
},

@@ -20,4 +20,4 @@ "repository": {

"dependencies": {
"elasticsearch": "^3.1.3",
"readable-stream": "^1.0.33"
"elasticsearch": "^11.0.0",
"readable-stream": "^2.0.6"
},

@@ -24,0 +24,0 @@ "devDependencies": {

@@ -1,2 +0,2 @@

# ElasticsearchStreamIndex ![io.js supported](https://img.shields.io/badge/io.js-supported-green.svg?style=flat) [![Build Status](https://travis-ci.org/topdmc/ElasticsearchStreamIndex.svg?branch=master)](https://travis-ci.org/topdmc/ElasticsearchStreamIndex)
# ElasticsearchStreamIndex ![io.js supported](https://img.shields.io/badge/io.js-supported-green.svg?style=flat) [![Build Status](https://travis-ci.org/topdmc/ElasticsearchStreamIndex.svg?branch=master)](https://travis-ci.org/topdmc/ElasticsearchStreamIndex) [![npm version](https://badge.fury.io/js/elasticsearch-stream-index.svg)](http://badge.fury.io/js/elasticsearch-stream-index)

@@ -12,4 +12,3 @@ A writable stream wrapped elasticsearch index operation to bulk

```javascript
var ElasticsearchStreamIndex = require('../index');
var ElasticsearchStreamIndex = require('elasticsearch-stream-index');
var elasticsearch = require('elasticsearch');

@@ -22,22 +21,53 @@

var es = new elasticsearch.Client(opt);
var idxName = 'cc_idx', typeName = 'cc_type';
var create_stream = new ElasticsearchStreamIndex(es, { highWaterMark: 2 });
create_stream.on('finish', function(){
console.log('---finished---');
// readable-stream
const Readable = require('stream').Readable;
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 1000;
this._index = 0;
}
Counter.prototype._read = function() {
var i = this._index++;
console.log('---| _read i=' + i);
if (i > this._max)
this.push(null);
else {
this.push({
index: idxName,
type: typeName,
id: i,
body : {
name: 'name _ ' + i
}
});
}
};
var rs = new Counter({ objectMode: true });
var es = new elasticsearch.Client({
host: 'localhost:9200',
log: 'error'
});
var idxName = 'cc_idx', typeName = 'cc_type';
var esi = new ElasticsearchStreamIndex(es, { highWaterMark: 128 });
esi.on('finish', function(){
assert.ok(true);
});
esi.on('pipe', function(src){
console.error('>> something is piping into me');
assert.equal(src, rs);
});
esi.on('unpipe', function(src){
console.error('>> something is unpiping into me');
assert.equal(src, rs);
});
esi.on('error', function(){ assert.ok(false); });
for(var i = 0; i < 10; i++){
create_stream.write({
index: idxName,
type: typeName,
id: i,
body : {
name: 'name _ ' + i
}
});
}
create_stream.end();
rs.pipe(esi);
```
var assert = require('assert');
var stream = require('stream');
var util = require('util');

@@ -6,29 +8,79 @@ var ElasticsearchStreamIndex = require('../index');

var elasticsearch = require('elasticsearch');
var idxName = 'test', typeName = 'cc_type';
var opt = {
host: 'localhost:9200',
log: 'error'
/* Generate a readable-stream demostrate data */
const Readable = require('stream').Readable;
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 50;
this._index = 0;
}
Counter.prototype._read = function() {
var i = this._index++;
console.log('---| _read i=' + i);
if (i > this._max)
this.push(null);
else {
this.push({
index: idxName,
type: typeName,
id: i,
body : {
name: 'name _ ' + i
}
});
}
};
var es = new elasticsearch.Client(opt);
/* Generate a transform stream */
function StringifyStream(options) {
if (!(this instanceof StringifyStream))
return new StringifyStream(options);
var create_stream = new ElasticsearchStreamIndex(es, { highWaterMark: 2 });
create_stream.on('finish', function(){
assert.ok(true);
options = options || {};
options.objectMode = true;
stream.Transform.call(this,options);
}
util.inherits(StringifyStream,stream.Transform);
StringifyStream.prototype._transform = function(d,e,callback) {
this.push(JSON.stringify(d));
this.push("\n");
callback();
};
/* try */
var rs = new Counter({ objectMode: true });
rs.on('readable', function(){
console.log(">>> rs.readable");
});
// rs.pipe(StringifyStream()).pipe(process.stdout);
create_stream.on('error', function(){ assert.ok(false); });
/* Create the writer */
var es = new elasticsearch.Client({
host: 'localhost:9200',
log: 'error'
});
var idxName = 'cc_idx', typeName = 'cc_type';
var esi = new ElasticsearchStreamIndex(es, { highWaterMark: 7 });
esi.on('finish', function(){
assert.ok(true);
});
esi.on('pipe', function(src){
console.log('>> something is piping into me');
assert.equal(src, rs);
});
esi.on('unpipe', function(src){
console.log('>> something is unpiping into me');
assert.equal(src, rs);
});
esi.on('error', function(){ assert.ok(false); });
for(var i = 0; i < 10; i++){
create_stream.write({
index: idxName,
type: typeName,
id: i,
body : {
name: 'name _ ' + i
}
});
}
create_stream.end();
/* Testing */
rs.pipe(esi);
// esi.end();

Sorry, the diff of this file is not supported yet