streaming-cache
Advanced tools
Comparing version 0.4.1 to 0.5.0
'use strict'; | ||
var fs = require('fs'); | ||
var Cache = require('../index.js'); | ||
var cache = new Cache({max: 5, maxAge: 15}); | ||
var fs = require('fs'); | ||
var cnt = 0; | ||
var s = cache.set('a'); | ||
var stream = cache.set('a'); | ||
var counter = 0; | ||
var intervalId = setInterval(function () { | ||
if (cnt >= 50) { | ||
if (counter >= 50) { | ||
clearInterval(intervalId); | ||
s.end(); | ||
stream.end(); | ||
} else { | ||
s.write(cnt + ' hello\n'); | ||
cnt++; | ||
stream.write(counter + ' hello\n'); | ||
counter++; | ||
} | ||
}, 100); | ||
s.pipe(process.stdout); | ||
stream.pipe(process.stdout); | ||
cache.get('a').pipe(process.stdout); | ||
'use strict'; | ||
var fs = require('fs'); | ||
var Cache = require('../index.js'); | ||
var cache = new Cache(); | ||
var fs = require('fs'); | ||
var readstream = fs.createReadStream('readme.md'); | ||
var readstream = fs.createReadStream('../readme.md'); | ||
var writestream = fs.createWriteStream('test2.md'); | ||
@@ -11,0 +10,0 @@ var writestream2 = fs.createWriteStream('test3.md'); |
255
index.js
'use strict'; | ||
var LRU = require('lru-cache') ; | ||
var EventEmitter = require('events').EventEmitter; | ||
var STATUS_PENDING = 1; | ||
var STATUS_DONE = 2; | ||
var LRU = require('lru-cache'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var LinkedList = require('linkedlist'); | ||
var Streams = require('stream'); | ||
var ReadStream = require('./lib/readStream'); | ||
var Streams = require('stream'); | ||
var assign = require('lodash.assign'); | ||
var utils = require('./lib/utils'); | ||
var LinkedList = require('linkedlist'); | ||
var emitters = {}; | ||
var lruOptions = { | ||
length: function (cachedObject) { | ||
if (cachedObject.data) { | ||
return cachedObject.data.length; | ||
} | ||
else { | ||
return 0; | ||
} | ||
} | ||
var DEFAULT_LENGTH = function (value) { | ||
return value.data ? value.data.length : 1; | ||
}; | ||
var StreamingCache = function StreamingCache(options) { | ||
options = options || {}; | ||
options.length = lruOptions.length; | ||
this.cache = LRU(options); | ||
Object.defineProperty(this, 'length', { | ||
get: function () { | ||
return this.cache.length; | ||
this.cache = LRU(assign({ length: DEFAULT_LENGTH }, options)); | ||
this.emitters = {}; | ||
Object.defineProperties(this, { | ||
'length': { | ||
get: function () { | ||
return this.cache.length; | ||
} | ||
}, | ||
'itemCount': { | ||
get: function () { | ||
return this.cache.itemCount; | ||
} | ||
} | ||
}); | ||
Object.defineProperty(this, 'itemCount', { | ||
get: function () { | ||
return this.cache.itemCount; | ||
} | ||
}); | ||
} | ||
}; | ||
StreamingCache.prototype.setData = function (key, data) { | ||
var self = this; | ||
checkKey(key); | ||
utils.ensureDefined(key, 'Key'); | ||
var c = {}; | ||
c.data = data; | ||
c.metadata = self.getMetadata(key) || {}; | ||
this.cache.set(key, { | ||
data: data, | ||
metadata: this.getMetadata(key) || {}, | ||
status: STATUS_DONE | ||
}); | ||
c.status = STATUS_DONE; | ||
self.cache.set(key, c); | ||
return this; | ||
} | ||
}; | ||
StreamingCache.prototype.getData = function (key, cb) { | ||
StreamingCache.prototype.getData = function (key, callback) { | ||
utils.ensureDefined(key, 'Key'); | ||
utils.ensureDefined(callback, 'Callback'); | ||
var self = this; | ||
checkKey(key); | ||
var hit = self.cache.get(key); | ||
if (!cb) { | ||
throw(new Error('callback expected')); | ||
if (!hit) { | ||
return callback('Cache miss'); | ||
} | ||
var object = this.cache.get(key); | ||
if (!object) { | ||
cb('cache miss'); | ||
if (hit.status === STATUS_DONE) { | ||
return callback(null, hit.data); | ||
} | ||
else if (object.status === STATUS_PENDING) { | ||
emitters[key].on('error', function (err) { | ||
cb(err); | ||
}) | ||
emitters[key].on('end', function (data) { | ||
cb(null, self.cache.get(key).data); | ||
}) | ||
} | ||
else { | ||
cb(null, object.data); | ||
return; | ||
} | ||
} | ||
self.emitters[key].on('error', function (error) { | ||
callback(error); | ||
}) | ||
self.emitters[key].on('end', function (data) { | ||
callback(null, self.cache.get(key).data); | ||
}) | ||
}; | ||
StreamingCache.prototype.setMetadata = function (key, metadata) { | ||
checkKey(key); | ||
utils.ensureDefined(key, 'Key'); | ||
var data = this.cache.get(key); | ||
if (!data) { | ||
data = {}; | ||
} | ||
data.metadata = metadata; | ||
var data = assign({}, this.cache.get(key), { metadata: metadata }) | ||
this.cache.set(key, data); | ||
@@ -91,20 +80,13 @@ }; | ||
StreamingCache.prototype.getMetadata = function (key) { | ||
checkKey(key); | ||
utils.ensureDefined(key, 'Key'); | ||
var data = this.cache.get(key); | ||
if (data && data.metadata) { | ||
return data.metadata; | ||
} | ||
return null; | ||
var hit = this.cache.get(key); | ||
return hit && hit.metadata ? hit.metadata : null; | ||
}; | ||
StreamingCache.prototype.exists = function (key) { | ||
checkKey(key); | ||
var object = this.cache.get(key); | ||
if (object && object.status) { | ||
return true; | ||
} | ||
else { | ||
return false; | ||
} | ||
utils.ensureDefined(key, 'Key'); | ||
var hit = this.cache.get(key); | ||
return !!(hit && hit.status); | ||
}; | ||
@@ -115,80 +97,68 @@ | ||
}; | ||
function checkKey(key) { | ||
if (!key) { | ||
throw(new Error('Key expected')); | ||
} | ||
} | ||
StreamingCache.prototype.returnPendingStream = function (key) { | ||
var self = this; | ||
var stream = new ReadStream(); | ||
emitters[key].on('error', function (error) { | ||
self.emitters[key].on('error', function (error) { | ||
stream.emit('error', error); | ||
}); | ||
emitters[key].on('end', function (data) { | ||
self.emitters[key].on('end', function (data) { | ||
stream.setBuffer(data); | ||
}); | ||
self.emitters[key].on('data', function (chunk) { | ||
stream.updateBuffer(Buffer.concat(self.emitters[key]._buffer)); | ||
}); | ||
stream.updateBuffer(Buffer.concat(emitters[key]._buffer)); | ||
emitters[key].on('data', function (chunk) { | ||
stream.updateBuffer(Buffer.concat(emitters[key]._buffer)); | ||
}); | ||
stream.updateBuffer(Buffer.concat(self.emitters[key]._buffer)); | ||
return stream; | ||
} | ||
}; | ||
StreamingCache.prototype.get = function (key) { | ||
checkKey(key); | ||
utils.ensureDefined(key, 'Key'); | ||
var object = this.cache.get(key); | ||
var stream; | ||
if (!object) { | ||
var hit = this.cache.get(key); | ||
if (!hit) { | ||
return undefined; | ||
} | ||
else if (object.status === STATUS_PENDING) { | ||
if (hit.status === STATUS_PENDING) { | ||
return this.returnPendingStream(key); | ||
} | ||
else { | ||
stream = new ReadStream(); | ||
stream.setBuffer(object.data); | ||
return stream; | ||
} | ||
}; | ||
StreamingCache.prototype.reset = function () { | ||
this.cache.reset(); | ||
var stream = new ReadStream(); | ||
stream.setBuffer(hit.data); | ||
return stream; | ||
}; | ||
StreamingCache.prototype.set = function (key) { | ||
utils.ensureDefined(key, 'Key'); | ||
var self = this; | ||
checkKey(key); | ||
var metadata = self.getMetadata(key) || {}; | ||
self.cache.set(key, { status : STATUS_PENDING, metadata: metadata}); | ||
emitters[key] = new EventEmitter(); | ||
emitters[key].setMaxListeners(250); | ||
emitters[key]._buffer = []; | ||
self.cache.set(key, { status: STATUS_PENDING, metadata: metadata }); | ||
self.emitters[key] = new EventEmitter(); | ||
self.emitters[key].setMaxListeners(250); | ||
self.emitters[key]._buffer = []; | ||
var chunks = new LinkedList(); | ||
var stream = new Streams.Duplex() | ||
stream._read = function () { | ||
if(!chunks){ | ||
this.needRead = true; | ||
return; | ||
} | ||
var chunk = chunks.shift(); | ||
if (!chunk) { | ||
this.needRead = true; | ||
} | ||
else { | ||
if (chunk) { | ||
this.push(chunk); | ||
this.needRead = false; | ||
} else { | ||
this.needRead = true; | ||
} | ||
} | ||
stream._write = function (chunk, encoding, next) { | ||
emitters[key]._buffer.push(chunk); | ||
emitters[key].emit('data', chunk); | ||
}; | ||
stream._write = function (chunk, encoding, next) { | ||
self.emitters[key]._buffer.push(chunk); | ||
self.emitters[key].emit('data', chunk); | ||
if (this.needRead) { | ||
this.push(chunk); | ||
this.needRead = false; | ||
} | ||
@@ -203,9 +173,10 @@ else { | ||
self.cache.del(key); | ||
if (emitters[key] && emitters[key]._events.error) { | ||
emitters[key].emit('error', err); | ||
if (self.emitters[key] && self.emitters[key]._events.error) { | ||
self.emitters[key].emit('error', err); | ||
} | ||
stream.removeAllListeners(); | ||
emitters[key].removeAllListeners(); | ||
delete emitters[key]; | ||
self.emitters[key].removeAllListeners(); | ||
delete self.emitters[key]; | ||
}); | ||
stream.on('finish', function () { | ||
@@ -218,18 +189,20 @@ if (this.needRead) { | ||
} | ||
var c = self.cache.get(key); | ||
chunks = null; | ||
if (!c) { | ||
emitters[key].emit('end', Buffer.concat(emitters[key]._buffer)); | ||
delete emitters[key]; | ||
return; | ||
var hit = self.cache.get(key); | ||
if (hit) { | ||
var buffer = Buffer.concat(self.emitters[key]._buffer); | ||
hit.metadata = hit.metadata || {}; | ||
utils.assign(hit.metadata, { | ||
length: buffer.toString().length, | ||
byteLength: buffer.byteLength | ||
}); | ||
utils.assign(hit, { | ||
data: buffer, | ||
status: STATUS_DONE | ||
}); | ||
self.cache.set(key, hit); | ||
} | ||
var buffer = Buffer.concat(emitters[key]._buffer); | ||
c.metadata = c.metadata || {}; | ||
c.metadata.length = buffer.toString().length; | ||
c.metadata.byteLength = buffer.byteLength; | ||
c.data = buffer; | ||
c.status = STATUS_DONE; | ||
self.cache.set(key, c); | ||
emitters[key].emit('end', Buffer.concat(emitters[key]._buffer)); | ||
delete emitters[key]; | ||
self.emitters[key].emit('end', Buffer.concat(self.emitters[key]._buffer)); | ||
delete self.emitters[key]; | ||
}); | ||
@@ -239,2 +212,6 @@ return stream; | ||
StreamingCache.prototype.reset = function () { | ||
this.cache.reset(); | ||
}; | ||
module.exports = StreamingCache; |
@@ -12,2 +12,3 @@ 'use strict'; | ||
this.readable = false; | ||
this.complete = false; | ||
this._object = new Buffer(0); | ||
@@ -26,6 +27,4 @@ } | ||
ReadStream.prototype.complete = false; | ||
ReadStream.prototype._push = function (size) { | ||
if (this.ended) { | ||
if (this.ended || !this._object.length) { | ||
return; | ||
@@ -38,15 +37,12 @@ } | ||
} | ||
if (!size) {size = 24 * 1024;} | ||
if (!this._object.length) { | ||
return; | ||
} | ||
if (this._offset + size > this._object.length) { | ||
size = this._object.length - this._offset; | ||
} | ||
size = size || 24 * 1024; | ||
size = Math.min(size, this._object.length - this._offset); | ||
if (this._offset < this._object.length) { | ||
var offset = this._offset; | ||
this.push(this._object.slice(this._offset, this._offset + size)); | ||
this._offset += size ; | ||
this.push(this._object.slice(offset, (offset + size))); | ||
} | ||
}; | ||
ReadStream.prototype._read = function (size) { | ||
@@ -53,0 +49,0 @@ this._push(size); |
{ | ||
"name": "streaming-cache", | ||
"version": "0.4.1", | ||
"version": "0.5.0", | ||
"description": "Cache and replay NodeJS streams", | ||
@@ -19,2 +19,3 @@ "main": "index.js", | ||
"linkedlist": "^1.0.1", | ||
"lodash.assign": "^4.0.1", | ||
"lru-cache": "^2.7.0" | ||
@@ -21,0 +22,0 @@ }, |
@@ -80,3 +80,3 @@ /* jshint jasmine: true */ | ||
expect(data).toEqual(undefined); | ||
expect(err).toEqual('cache miss'); | ||
expect(err).toEqual('Cache miss'); | ||
done(); | ||
@@ -83,0 +83,0 @@ })); |
488501
37
879
3
+ Addedlodash.assign@^4.0.1
+ Addedlodash.assign@4.2.0(transitive)