elasticsearch-stream-index
Advanced tools
Comparing version
16
index.js
@@ -10,3 +10,3 @@ // node modules | ||
try{ | ||
debug = require('debug')('StreamingElasticsearch:BulkStream'); | ||
debug = require('debug')('elasticsearch-stream-index:BulkStream'); | ||
} catch (err){} | ||
@@ -18,5 +18,5 @@ | ||
} | ||
Writable.call(this, { objectMode: true }); | ||
this.highWaterMark = (options && options.highWaterMark) || 64; | ||
Writable.call(this, { objectMode: true, highWaterMark: this.highWaterMark }); | ||
this.es_client = es_client; | ||
this.highWaterMark = (options && options.highWaterMark) || 64; | ||
this.bulkCount = 0; | ||
@@ -54,3 +54,3 @@ this.arr = []; | ||
}); | ||
debug('_bulk bufArray', bufArray.length); | ||
// debug('_bulk bufArray', bufArray.length); | ||
// bulk insert | ||
@@ -63,5 +63,13 @@ this.es_client.bulk({ body: bufArray }, function(err, resp){ | ||
debug('_bulk resp', resp); | ||
// add fail handle | ||
if (resp.errors === true) { | ||
debug('_bulk error', resp.items); | ||
self.emit('fail', resp.items); | ||
} | ||
self.bulkCount = 0; | ||
self.nextChunkIsParams = false; | ||
self.arr = []; | ||
callback(); | ||
@@ -68,0 +76,0 @@ }); |
{ | ||
"name": "elasticsearch-stream-index", | ||
"version": "1.0.1", | ||
"version": "1.0.2", | ||
"description": "a writable stream wrapped elasticsearch index operation to bulk", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "node test/test.js" | ||
"test": "DEBUG=* node test/test.js" | ||
}, | ||
@@ -20,4 +20,4 @@ "repository": { | ||
"dependencies": { | ||
"elasticsearch": "^3.1.3", | ||
"readable-stream": "^1.0.33" | ||
"elasticsearch": "^11.0.0", | ||
"readable-stream": "^2.0.6" | ||
}, | ||
@@ -24,0 +24,0 @@ "devDependencies": { |
@@ -1,2 +0,2 @@ | ||
# ElasticsearchStreamIndex  [](https://travis-ci.org/topdmc/ElasticsearchStreamIndex) | ||
# ElasticsearchStreamIndex  [](https://travis-ci.org/topdmc/ElasticsearchStreamIndex) [](http://badge.fury.io/js/elasticsearch-stream-index) | ||
@@ -12,4 +12,3 @@ A writable stream wrapped elasticsearch index operation to bulk | ||
```javascript | ||
var ElasticsearchStreamIndex = require('../index'); | ||
var ElasticsearchStreamIndex = require('elasticsearch-stream-index'); | ||
var elasticsearch = require('elasticsearch'); | ||
@@ -22,22 +21,53 @@ | ||
var es = new elasticsearch.Client(opt); | ||
var idxName = 'cc_idx', typeName = 'cc_type'; | ||
var create_stream = new ElasticsearchStreamIndex(es, { highWaterMark: 2 }); | ||
create_stream.on('finish', function(){ | ||
console.log('---finished---'); | ||
// readable-stream | ||
const Readable = require('stream').Readable; | ||
util.inherits(Counter, Readable); | ||
function Counter(opt) { | ||
Readable.call(this, opt); | ||
this._max = 1000; | ||
this._index = 0; | ||
} | ||
Counter.prototype._read = function() { | ||
var i = this._index++; | ||
console.log('---| _read i=' + i); | ||
if (i > this._max) | ||
this.push(null); | ||
else { | ||
this.push({ | ||
index: idxName, | ||
type: typeName, | ||
id: i, | ||
body : { | ||
name: 'name _ ' + i | ||
} | ||
}); | ||
} | ||
}; | ||
var rs = new Counter({ objectMode: true }); | ||
var es = new elasticsearch.Client({ | ||
host: 'localhost:9200', | ||
log: 'error' | ||
}); | ||
var idxName = 'cc_idx', typeName = 'cc_type'; | ||
var esi = new ElasticsearchStreamIndex(es, { highWaterMark: 128 }); | ||
esi.on('finish', function(){ | ||
assert.ok(true); | ||
}); | ||
esi.on('pipe', function(src){ | ||
console.error('>> something is piping into me'); | ||
assert.equal(src, rs); | ||
}); | ||
esi.on('unpipe', function(src){ | ||
console.error('>> something is unpiping into me'); | ||
assert.equal(src, rs); | ||
}); | ||
esi.on('error', function(){ assert.ok(false); }); | ||
for(var i = 0; i < 10; i++){ | ||
create_stream.write({ | ||
index: idxName, | ||
type: typeName, | ||
id: i, | ||
body : { | ||
name: 'name _ ' + i | ||
} | ||
}); | ||
} | ||
create_stream.end(); | ||
rs.pipe(esi); | ||
``` |
var assert = require('assert'); | ||
var stream = require('stream'); | ||
var util = require('util'); | ||
@@ -6,29 +8,79 @@ var ElasticsearchStreamIndex = require('../index'); | ||
var elasticsearch = require('elasticsearch'); | ||
var idxName = 'test', typeName = 'cc_type'; | ||
var opt = { | ||
host: 'localhost:9200', | ||
log: 'error' | ||
/* Generate a readable-stream demostrate data */ | ||
const Readable = require('stream').Readable; | ||
util.inherits(Counter, Readable); | ||
function Counter(opt) { | ||
Readable.call(this, opt); | ||
this._max = 50; | ||
this._index = 0; | ||
} | ||
Counter.prototype._read = function() { | ||
var i = this._index++; | ||
console.log('---| _read i=' + i); | ||
if (i > this._max) | ||
this.push(null); | ||
else { | ||
this.push({ | ||
index: idxName, | ||
type: typeName, | ||
id: i, | ||
body : { | ||
name: 'name _ ' + i | ||
} | ||
}); | ||
} | ||
}; | ||
var es = new elasticsearch.Client(opt); | ||
/* Generate a transform stream */ | ||
function StringifyStream(options) { | ||
if (!(this instanceof StringifyStream)) | ||
return new StringifyStream(options); | ||
var create_stream = new ElasticsearchStreamIndex(es, { highWaterMark: 2 }); | ||
create_stream.on('finish', function(){ | ||
assert.ok(true); | ||
options = options || {}; | ||
options.objectMode = true; | ||
stream.Transform.call(this,options); | ||
} | ||
util.inherits(StringifyStream,stream.Transform); | ||
StringifyStream.prototype._transform = function(d,e,callback) { | ||
this.push(JSON.stringify(d)); | ||
this.push("\n"); | ||
callback(); | ||
}; | ||
/* try */ | ||
var rs = new Counter({ objectMode: true }); | ||
rs.on('readable', function(){ | ||
console.log(">>> rs.readable"); | ||
}); | ||
// rs.pipe(StringifyStream()).pipe(process.stdout); | ||
create_stream.on('error', function(){ assert.ok(false); }); | ||
/* Create the writer */ | ||
var es = new elasticsearch.Client({ | ||
host: 'localhost:9200', | ||
log: 'error' | ||
}); | ||
var idxName = 'cc_idx', typeName = 'cc_type'; | ||
var esi = new ElasticsearchStreamIndex(es, { highWaterMark: 7 }); | ||
esi.on('finish', function(){ | ||
assert.ok(true); | ||
}); | ||
esi.on('pipe', function(src){ | ||
console.log('>> something is piping into me'); | ||
assert.equal(src, rs); | ||
}); | ||
esi.on('unpipe', function(src){ | ||
console.log('>> something is unpiping into me'); | ||
assert.equal(src, rs); | ||
}); | ||
esi.on('error', function(){ assert.ok(false); }); | ||
for(var i = 0; i < 10; i++){ | ||
create_stream.write({ | ||
index: idxName, | ||
type: typeName, | ||
id: i, | ||
body : { | ||
name: 'name _ ' + i | ||
} | ||
}); | ||
} | ||
create_stream.end(); | ||
/* Testing */ | ||
rs.pipe(esi); | ||
// esi.end(); |
Sorry, the diff of this file is not supported yet
8145
40.12%155
49.04%72
71.43%+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
Updated
Updated