streaming-cache
Advanced tools
Comparing version 0.3.2 to 0.4.0
'use strict'; | ||
var Cache = require('../index.js'); | ||
var cache = new Cache(); | ||
var cache = new Cache({max: 5, maxAge: 15}); | ||
var fs = require('fs'); | ||
// var readstream = fs.createReadStream('readme.md'); | ||
// var writestream = fs.createWriteStream('test.md'); | ||
var writestream2 = fs.createWriteStream('test2.txt'); | ||
// readstream.pipe(cache.put('a')); | ||
// readstream.pipe(writestream); | ||
// readstream.pipe(process.stderr); | ||
// setTimeout(function(){ | ||
// writestream2.write('written from cache\n\n'); | ||
// cache.get('a').pipe(writestream2); | ||
// }, 200); | ||
// setTimeout(function(){}, 2000); | ||
//process.stdin.pipe(cache.put('a')); | ||
var cnt = 0; | ||
var s = cache.set('a') | ||
var s = cache.set('a'); | ||
var intervalId = setInterval(function () { | ||
if (cnt >= 5) { | ||
// console.log('cnt', cnt) | ||
clearInterval(intervalId) | ||
cache.get('a').pipe(process.stdout); | ||
if (cnt >= 50) { | ||
clearInterval(intervalId); | ||
s.end(); | ||
} else { | ||
s.write(cnt + ' hello', function () {}) | ||
s.write(cnt + ' hello\n'); | ||
cnt++; | ||
} | ||
}, 1000) | ||
}, 100); | ||
//setTimeout(function () { | ||
s.pipe(process.stdout); | ||
cache.get('a').pipe(process.stdout); | ||
// }, 5000); | ||
31
index.js
@@ -26,2 +26,4 @@ 'use strict'; | ||
var StreamingCache = function StreamingCache(options) { | ||
options = options || {}; | ||
options.length = lruOptions.length; | ||
this.cache = LRU(options); | ||
@@ -77,2 +79,3 @@ Object.defineProperty(this, 'length', { | ||
} | ||
StreamingCache.prototype.setMetadata = function (key, metadata) { | ||
@@ -174,8 +177,8 @@ checkKey(key); | ||
var chunk = chunks.shift(); | ||
if(!chunk){ | ||
if (!chunk) { | ||
this.needRead = true; | ||
} | ||
else{ | ||
this.push(chunk); | ||
this.needRead = false; | ||
else { | ||
this.push(chunk); | ||
this.needRead = false; | ||
} | ||
@@ -186,8 +189,7 @@ } | ||
emitters[key].emit('data', chunk); | ||
if(this.needRead){ | ||
if (this.needRead) { | ||
this.push(chunk); | ||
} | ||
else{ | ||
else { | ||
chunks.push(chunk); | ||
} | ||
@@ -207,11 +209,16 @@ next(null, chunk); | ||
stream.on('finish', function () { | ||
if(this.needRead){ | ||
if (this.needRead) { | ||
this.push(null); | ||
} | ||
else{ | ||
chunks.push(null); | ||
else { | ||
chunks.push(null); | ||
} | ||
var c = self.cache.get(key); | ||
var buffer = Buffer.concat(emitters[key]._buffer) | ||
chunks = null; | ||
if (!c) { | ||
emitters[key].emit('end', Buffer.concat(emitters[key]._buffer)); | ||
delete emitters[key]; | ||
return; | ||
} | ||
var buffer = Buffer.concat(emitters[key]._buffer); | ||
c.metadata = c.metadata || {}; | ||
@@ -218,0 +225,0 @@ c.metadata.length = buffer.toString().length; |
{ | ||
"name": "streaming-cache", | ||
"version": "0.3.2", | ||
"version": "0.4.0", | ||
"description": "Cache and replay NodeJS streams", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -6,10 +6,16 @@ [![Circle CI](https://circleci.com/gh/LaurentZuijdwijk/streaming-cache/tree/master.svg?style=svg)](https://circleci.com/gh/LaurentZuijdwijk/streaming-cache/tree/master) | ||
Speed up your services. | ||
Cache, queue and distribute streams immediately. Streams can be replayed immediately, even if the source is not finished. | ||
Uses a fixed size LRU-cache in the background. | ||
Usefull for caching (slow) streaming connections, such as S3 requests or complex database queries. | ||
We use it to stream data into the cache and make any waiting connections for the same data queue up until the data is in the cache. | ||
If there are for example 3 requests for a certain file in close succession, then we can pipe the result for the first request into the cache. The 2 other requests will receive a stream which will start even before the first one is finished. | ||
If there are for example 3 requests for a certain file in close succession, then we can pipe the result for the first request into the cache. The 2 other requests will receive a stream which will start when the first one is finished. | ||
Performance | ||
----------- | ||
Serving from this cache is extremely fast. On my local machine I get 2.5GB per second for a single process on localhost using AB. (4th gen i7). | ||
@@ -42,3 +48,8 @@ Installation | ||
Options | ||
------- | ||
For a list of options see: https://www.npmjs.com/package/lru-cache | ||
API | ||
@@ -48,3 +59,3 @@ --- | ||
##### set(key) | ||
returns a transform stream that can be piped to | ||
returns a Duplex stream | ||
``` | ||
@@ -55,3 +66,3 @@ fileStream.pipe(cache.set('key')).pipe(res); | ||
##### get(key):ReadableStream | ||
##### get(key) => ReadableStream | ||
@@ -66,3 +77,3 @@ ```javascript | ||
#### setData(key, data) | ||
#### setData(key, data) => WriteableStream | ||
A set data synchronously to stream at a later moment | ||
@@ -69,0 +80,0 @@ |
@@ -77,3 +77,3 @@ /* jshint jasmine: true */ | ||
expect(cache.getData).toThrow(); | ||
expect(function() { cache.getData('c') }).toThrow(); | ||
expect(function () { cache.getData('c') }).toThrow(); | ||
expect(cache.getData('c', function (err, data) { | ||
@@ -87,3 +87,3 @@ expect(data).toEqual(undefined); | ||
it('should handle setmetadata', function () { | ||
cache.setMetadata('abc', {a:'c'}); | ||
cache.setMetadata('abc', {a: 'c'}); | ||
cache.setData('abc', 'test'); | ||
@@ -96,3 +96,3 @@ expect(cache.setMetadata).toThrow(); | ||
it('should save the length to metadata', function (done) { | ||
cache.setMetadata('abc', {a:'b'}); | ||
cache.setMetadata('abc', {a: 'b'}); | ||
var s = cache.set('abc'); | ||
@@ -103,3 +103,3 @@ s.write('a'); | ||
setTimeout(function(){ | ||
setTimeout(function () { | ||
expect(cache.cache.get('abc').data.toString()).toEqual('ab'); | ||
@@ -123,3 +123,3 @@ expect(cache.getMetadata('abc').a).toEqual('b'); | ||
it('should reset the cache when called', function() { | ||
it('should reset the cache when called', function () { | ||
cache.setData('aaa', 'value'); | ||
@@ -144,3 +144,22 @@ cache.reset(); | ||
}); | ||
}); | ||
describe('streaming cache short timeout', function () { | ||
var s; | ||
beforeEach(function () { | ||
cache = new StreamingCache({maxAge: 100}) | ||
s = cache.set('b'); | ||
}); | ||
it('Writing to stream should set data', function (done) { | ||
s.write('a'); | ||
s.write('b'); | ||
s.end(null); | ||
setTimeout(function () { | ||
expect(s.read().toString() + s.read().toString()).toEqual('ab') | ||
cache.getData('b', function (err, data) { | ||
expect(data.toString()).toEqual(null) | ||
done(); | ||
}); | ||
}, 130); | ||
}); | ||
}); |
308461
529
96
20