streaming-cache
Advanced tools
Comparing version 0.2.1 to 0.3.0
36
index.js
'use strict'; | ||
var LRU = require('lru-cache') ; | ||
var Transform = require('stream').Transform; | ||
var EventEmitter = require('events').EventEmitter; | ||
var STATUS_PENDING = 1; | ||
@@ -13,2 +9,5 @@ var STATUS_DONE = 2; | ||
var ReadStream = require('./lib/readStream'); | ||
var Streams = require('stream'); | ||
var LinkedList = require('linkedlist'); | ||
var emitters = {}; | ||
@@ -47,2 +46,4 @@ | ||
c.data = data; | ||
c.metadata = self.getMetadata(key) || {}; | ||
c.status = STATUS_DONE; | ||
@@ -65,3 +66,2 @@ self.cache.set(key, c); | ||
else if (object.status === STATUS_PENDING) { | ||
emitters[key].on('error', function (err) { | ||
@@ -120,3 +120,3 @@ cb(err); | ||
StreamingCache.prototype.returnPendingStream = function(key){ | ||
StreamingCache.prototype.returnPendingStream = function (key) { | ||
var stream = new ReadStream(); | ||
@@ -143,3 +143,2 @@ emitters[key].on('error', function (error) { | ||
var stream; | ||
var self = this; | ||
@@ -159,3 +158,3 @@ if (!object) { | ||
StreamingCache.prototype.reset = function() { | ||
StreamingCache.prototype.reset = function () { | ||
this.cache.reset(); | ||
@@ -168,13 +167,20 @@ }; | ||
self.cache.set(key, {status : STATUS_PENDING, metadata: {}}); | ||
var metadata = self.getMetadata(key) || {}; | ||
self.cache.set(key, { status : STATUS_PENDING, metadata: metadata}); | ||
emitters[key] = new EventEmitter(); | ||
emitters[key].setMaxListeners(250); | ||
emitters[key]._buffer = []; | ||
var stream = new Transform(); | ||
stream._transform = function (chunk, encoding, done) { | ||
var chunks = new LinkedList(); | ||
var stream = new Streams.Duplex() | ||
stream._read = function () { | ||
this.push(chunks.shift()); | ||
} | ||
stream._write = function (chunk, encoding, next) { | ||
emitters[key]._buffer.push(chunk); | ||
emitters[key].emit('data', chunk); | ||
done(null, chunk); | ||
}; | ||
chunks.push(chunk); | ||
next(null, chunk); | ||
} | ||
@@ -191,6 +197,8 @@ stream.on('error', function (err) { | ||
stream.on('finish', function () { | ||
chunks.push(null); | ||
var c = self.cache.get(key); | ||
var buffer = Buffer.concat(emitters[key]._buffer) | ||
c.metadata = c.metadata || {}; | ||
c.metadata.length = buffer.length; | ||
c.metadata.length = buffer.toString().length; | ||
c.metadata.byteLength = buffer.byteLength; | ||
@@ -197,0 +205,0 @@ c.data = buffer; |
{ | ||
"name": "streaming-cache", | ||
"version": "0.2.1", | ||
"version": "0.3.0", | ||
"description": "Cache and replay NodeJS streams", | ||
@@ -15,5 +15,6 @@ "main": "index.js", | ||
"author": "Laurent Zuijdwijk", | ||
"repository" : "https://github.com/LaurentZuijdwijk/streaming-cache", | ||
"repository": "https://github.com/LaurentZuijdwijk/streaming-cache", | ||
"license": "BSD-2-Clause", | ||
"dependencies": { | ||
"linkedlist": "^1.0.1", | ||
"lru-cache": "^2.7.0" | ||
@@ -20,0 +21,0 @@ }, |
@@ -5,3 +5,3 @@ /* jshint jasmine: true */ | ||
var StreamingCache = require('../index'); | ||
var Transform = require('stream').Transform; | ||
var Duplex = require('stream').Duplex; | ||
@@ -12,2 +12,3 @@ var cache = new StreamingCache(); | ||
beforeEach(function () { | ||
cache = new StreamingCache() | ||
s = cache.set('a'); | ||
@@ -20,3 +21,3 @@ }) | ||
it('cache.set should return a stream', function () { | ||
expect(s).toEqual(jasmine.any(Transform)); | ||
expect(s).toEqual(jasmine.any(Duplex)); | ||
}); | ||
@@ -88,9 +89,23 @@ it('Writing to stream should set data', function (done) { | ||
it('should handle setmetadata', function () { | ||
cache.cache.set('abc', {data: 'test'}); | ||
cache.setMetadata('abc', {a:'c'}); | ||
cache.setData('abc', 'test'); | ||
expect(cache.setMetadata).toThrow(); | ||
cache.setMetadata('abc', 1234); | ||
expect(cache.cache.get('abc').data).toEqual('test'); | ||
expect(cache.getMetadata('abc')).toEqual(1234); | ||
expect(cache.getMetadata('abc').a).toEqual('c'); | ||
}); | ||
it('should save the length to metadata', function (done) { | ||
cache.setMetadata('abc', {a:'b'}); | ||
var s = cache.set('abc'); | ||
s.write('a'); | ||
s.write('b'); | ||
s.end(''); | ||
setTimeout(function(){ | ||
expect(cache.cache.get('abc').data.toString()).toEqual('ab'); | ||
expect(cache.getMetadata('abc').a).toEqual('b'); | ||
expect(cache.getMetadata('abc').length).toEqual(2); | ||
done(); | ||
}, 10) | ||
}); | ||
it('should handle getmetadata', function () { | ||
@@ -97,0 +112,0 @@ cache.cache.set('abc', {data: 1, metaData: 'bbb'}); |
21
500
307612
2
+ Addedlinkedlist@^1.0.1
+ Addedlinkedlist@1.0.1(transitive)