fluent-logger
Advanced tools
Comparing version 2.4.0 to 2.4.1
@@ -30,3 +30,2 @@ 'use strict'; | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
this._sendQueueTail = -1; | ||
this._eventEmitter = new EventEmitter(); | ||
@@ -79,3 +78,2 @@ } | ||
self._sendQueue.push(item); | ||
self._sendQueueTail++; | ||
self._connect(function(){ | ||
@@ -191,9 +189,27 @@ self._flushSendQueue(); | ||
var self = this; | ||
var pos = self._sendQueue.length - self._sendQueueTail - 1; | ||
var item = self._sendQueue[pos]; | ||
if (self._flushingSendQueue) | ||
return; | ||
self._flushingSendQueue = true; | ||
process.nextTick(function waitToWrite() { | ||
if (!self._socket) { | ||
self._flushingSendQueue = false; | ||
return; | ||
} | ||
if (self._socket.writable) { | ||
self._doFlushSendQueue(); | ||
} else { | ||
process.nextTick(waitToWrite); | ||
} | ||
}); | ||
}; | ||
FluentSender.prototype._doFlushSendQueue = function() { | ||
var self = this; | ||
var item = self._sendQueue.shift(); | ||
if (item === undefined) { | ||
self._flushingSendQueue = false; | ||
// nothing written; | ||
} else { | ||
self._sendQueueTail--; | ||
self._sendQueue.shift(); | ||
self._socket.write(new Buffer(item.packet), function(){ | ||
@@ -206,2 +222,3 @@ if (self.requireAckResponse) { | ||
clearInterval(intervalId); | ||
clearTimeout(timeoutId); | ||
if (response.ack !== item.options.chunk) { | ||
@@ -212,5 +229,9 @@ var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different', | ||
} | ||
item.callback && item.callback(); | ||
process.nextTick(function() { | ||
self._doFlushSendQueue(); // if socket is still available | ||
}); | ||
} | ||
}, 100); | ||
setTimeout(function() { | ||
}, 10); | ||
var timeoutId = setTimeout(function() { | ||
var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); | ||
@@ -220,11 +241,9 @@ self._handleEvent('error', error, item.callback); | ||
}, self.ackResponseTimeout); | ||
} else { | ||
item.callback && item.callback(); | ||
process.nextTick(function() { | ||
self._doFlushSendQueue(); // if socket is still available | ||
}); | ||
} | ||
item.callback && item.callback(); | ||
}); | ||
process.nextTick(function() { | ||
// socket is still available | ||
if (self._socket && self._socket.writable) { | ||
self._flushSendQueue(); | ||
} | ||
}); | ||
// TODO: how should we recorver if dequeued items are not sent. | ||
@@ -231,0 +250,0 @@ } |
@@ -85,3 +85,3 @@ 'use strict'; | ||
server.close(function(){ | ||
_callback(messages); | ||
_callback && _callback(messages); | ||
}); | ||
@@ -88,0 +88,0 @@ }, 100); |
{ | ||
"name": "fluent-logger", | ||
"version": "2.4.0", | ||
"version": "2.4.1", | ||
"main": "./lib/index.js", | ||
@@ -36,14 +36,9 @@ "scripts": { | ||
"devDependencies": { | ||
"async": "", | ||
"chai": "", | ||
"log4js": "<2", | ||
"mocha": "", | ||
"chai": "", | ||
"log4js": "", | ||
"async": "", | ||
"winston": "" | ||
}, | ||
"licenses": [ | ||
{ | ||
"type": "Apache", | ||
"url": "http://www.apache.org/licenses/LICENSE-2.0.html" | ||
} | ||
], | ||
"license": "Apache-2.0", | ||
"keywords": [ | ||
@@ -50,0 +45,0 @@ "logger", |
@@ -5,2 +5,4 @@ # fluent-logger for Node.js | ||
[![NPM](https://nodei.co/npm/fluent-logger.png?downloads=true&downloadRank=true)](https://nodei.co/npm/fluent-logger/) | ||
[![Build Status](https://secure.travis-ci.org/fluent/fluent-logger-node.png?branch=master,develop)](http://travis-ci.org/fluent/fluent-logger-node) | ||
@@ -7,0 +9,0 @@ |
@@ -565,2 +565,33 @@ var expect = require('chai').expect; | ||
it('should process messages step by step on requireAckResponse=true', function(done) { | ||
runServer({ requireAckResponse: true }, function(server, finish) { | ||
var s = new sender.FluentSender('debug', { | ||
port: server.port, | ||
timeout: 3.0, | ||
reconnectInterval: 600000, | ||
requireAckResponse: true | ||
}); | ||
var errors = []; | ||
s.on('error', function(err) { | ||
errors.push(count+': '+err); | ||
}); | ||
var maxCount = 20; | ||
var count = 0; | ||
var sendMessage = function() { | ||
var time = Math.round(Date.now() / 1000); | ||
var data = { | ||
count: count | ||
}; | ||
s.emit('test', data, time); | ||
count++; | ||
if (count > maxCount) { | ||
clearInterval(timer); | ||
finish(); | ||
expect(errors.join('\n')).to.be.equal(''); | ||
done(); | ||
} | ||
}; | ||
var timer = setInterval(sendMessage, 10); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
72638
1289
255