New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

streaming-cache

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streaming-cache - npm Package Compare versions

Comparing version 0.4.1 to 0.5.0

coverage/coverage.json

22

examples/example.js
'use strict';
var fs = require('fs');
var Cache = require('../index.js');
var cache = new Cache({max: 5, maxAge: 15});
var fs = require('fs');
var cnt = 0;
var s = cache.set('a');
var stream = cache.set('a');
var counter = 0;
var intervalId = setInterval(function () {
if (cnt >= 50) {
if (counter >= 50) {
clearInterval(intervalId);
s.end();
stream.end();
} else {
s.write(cnt + ' hello\n');
cnt++;
stream.write(counter + ' hello\n');
counter++;
}
}, 100);
s.pipe(process.stdout);
stream.pipe(process.stdout);
cache.get('a').pipe(process.stdout);
'use strict';
var fs = require('fs');
var Cache = require('../index.js');
var cache = new Cache();
var fs = require('fs');
var readstream = fs.createReadStream('readme.md');
var readstream = fs.createReadStream('../readme.md');
var writestream = fs.createWriteStream('test2.md');

@@ -11,0 +10,0 @@ var writestream2 = fs.createWriteStream('test3.md');

'use strict';
var LRU = require('lru-cache') ;
var EventEmitter = require('events').EventEmitter;
var STATUS_PENDING = 1;
var STATUS_DONE = 2;
var LRU = require('lru-cache');
var EventEmitter = require('events').EventEmitter;
var LinkedList = require('linkedlist');
var Streams = require('stream');
var ReadStream = require('./lib/readStream');
var Streams = require('stream');
var assign = require('lodash.assign');
var utils = require('./lib/utils');
var LinkedList = require('linkedlist');
var emitters = {};
var lruOptions = {
length: function (cachedObject) {
if (cachedObject.data) {
return cachedObject.data.length;
}
else {
return 0;
}
}
var DEFAULT_LENGTH = function (value) {
return value.data ? value.data.length : 1;
};
var StreamingCache = function StreamingCache(options) {
options = options || {};
options.length = lruOptions.length;
this.cache = LRU(options);
Object.defineProperty(this, 'length', {
get: function () {
return this.cache.length;
this.cache = LRU(assign({ length: DEFAULT_LENGTH }, options));
this.emitters = {};
Object.defineProperties(this, {
'length': {
get: function () {
return this.cache.length;
}
},
'itemCount': {
get: function () {
return this.cache.itemCount;
}
}
});
Object.defineProperty(this, 'itemCount', {
get: function () {
return this.cache.itemCount;
}
});
}
};
StreamingCache.prototype.setData = function (key, data) {
var self = this;
checkKey(key);
utils.ensureDefined(key, 'Key');
var c = {};
c.data = data;
c.metadata = self.getMetadata(key) || {};
this.cache.set(key, {
data: data,
metadata: this.getMetadata(key) || {},
status: STATUS_DONE
});
c.status = STATUS_DONE;
self.cache.set(key, c);
return this;
}
};
StreamingCache.prototype.getData = function (key, cb) {
StreamingCache.prototype.getData = function (key, callback) {
utils.ensureDefined(key, 'Key');
utils.ensureDefined(callback, 'Callback');
var self = this;
checkKey(key);
var hit = self.cache.get(key);
if (!cb) {
throw(new Error('callback expected'));
if (!hit) {
return callback('Cache miss');
}
var object = this.cache.get(key);
if (!object) {
cb('cache miss');
if (hit.status === STATUS_DONE) {
return callback(null, hit.data);
}
else if (object.status === STATUS_PENDING) {
emitters[key].on('error', function (err) {
cb(err);
})
emitters[key].on('end', function (data) {
cb(null, self.cache.get(key).data);
})
}
else {
cb(null, object.data);
return;
}
}
self.emitters[key].on('error', function (error) {
callback(error);
})
self.emitters[key].on('end', function (data) {
callback(null, self.cache.get(key).data);
})
};
StreamingCache.prototype.setMetadata = function (key, metadata) {
checkKey(key);
utils.ensureDefined(key, 'Key');
var data = this.cache.get(key);
if (!data) {
data = {};
}
data.metadata = metadata;
var data = assign({}, this.cache.get(key), { metadata: metadata })
this.cache.set(key, data);

@@ -91,20 +80,13 @@ };

StreamingCache.prototype.getMetadata = function (key) {
checkKey(key);
utils.ensureDefined(key, 'Key');
var data = this.cache.get(key);
if (data && data.metadata) {
return data.metadata;
}
return null;
var hit = this.cache.get(key);
return hit && hit.metadata ? hit.metadata : null;
};
StreamingCache.prototype.exists = function (key) {
checkKey(key);
var object = this.cache.get(key);
if (object && object.status) {
return true;
}
else {
return false;
}
utils.ensureDefined(key, 'Key');
var hit = this.cache.get(key);
return !!(hit && hit.status);
};

@@ -115,80 +97,68 @@

};
function checkKey(key) {
if (!key) {
throw(new Error('Key expected'));
}
}
StreamingCache.prototype.returnPendingStream = function (key) {
var self = this;
var stream = new ReadStream();
emitters[key].on('error', function (error) {
self.emitters[key].on('error', function (error) {
stream.emit('error', error);
});
emitters[key].on('end', function (data) {
self.emitters[key].on('end', function (data) {
stream.setBuffer(data);
});
self.emitters[key].on('data', function (chunk) {
stream.updateBuffer(Buffer.concat(self.emitters[key]._buffer));
});
stream.updateBuffer(Buffer.concat(emitters[key]._buffer));
emitters[key].on('data', function (chunk) {
stream.updateBuffer(Buffer.concat(emitters[key]._buffer));
});
stream.updateBuffer(Buffer.concat(self.emitters[key]._buffer));
return stream;
}
};
StreamingCache.prototype.get = function (key) {
checkKey(key);
utils.ensureDefined(key, 'Key');
var object = this.cache.get(key);
var stream;
if (!object) {
var hit = this.cache.get(key);
if (!hit) {
return undefined;
}
else if (object.status === STATUS_PENDING) {
if (hit.status === STATUS_PENDING) {
return this.returnPendingStream(key);
}
else {
stream = new ReadStream();
stream.setBuffer(object.data);
return stream;
}
};
StreamingCache.prototype.reset = function () {
this.cache.reset();
var stream = new ReadStream();
stream.setBuffer(hit.data);
return stream;
};
StreamingCache.prototype.set = function (key) {
utils.ensureDefined(key, 'Key');
var self = this;
checkKey(key);
var metadata = self.getMetadata(key) || {};
self.cache.set(key, { status : STATUS_PENDING, metadata: metadata});
emitters[key] = new EventEmitter();
emitters[key].setMaxListeners(250);
emitters[key]._buffer = [];
self.cache.set(key, { status: STATUS_PENDING, metadata: metadata });
self.emitters[key] = new EventEmitter();
self.emitters[key].setMaxListeners(250);
self.emitters[key]._buffer = [];
var chunks = new LinkedList();
var stream = new Streams.Duplex()
stream._read = function () {
if(!chunks){
this.needRead = true;
return;
}
var chunk = chunks.shift();
if (!chunk) {
this.needRead = true;
}
else {
if (chunk) {
this.push(chunk);
this.needRead = false;
} else {
this.needRead = true;
}
}
stream._write = function (chunk, encoding, next) {
emitters[key]._buffer.push(chunk);
emitters[key].emit('data', chunk);
};
stream._write = function (chunk, encoding, next) {
self.emitters[key]._buffer.push(chunk);
self.emitters[key].emit('data', chunk);
if (this.needRead) {
this.push(chunk);
this.needRead = false;
}

@@ -203,9 +173,10 @@ else {

self.cache.del(key);
if (emitters[key] && emitters[key]._events.error) {
emitters[key].emit('error', err);
if (self.emitters[key] && self.emitters[key]._events.error) {
self.emitters[key].emit('error', err);
}
stream.removeAllListeners();
emitters[key].removeAllListeners();
delete emitters[key];
self.emitters[key].removeAllListeners();
delete self.emitters[key];
});
stream.on('finish', function () {

@@ -218,18 +189,20 @@ if (this.needRead) {

}
var c = self.cache.get(key);
chunks = null;
if (!c) {
emitters[key].emit('end', Buffer.concat(emitters[key]._buffer));
delete emitters[key];
return;
var hit = self.cache.get(key);
if (hit) {
var buffer = Buffer.concat(self.emitters[key]._buffer);
hit.metadata = hit.metadata || {};
utils.assign(hit.metadata, {
length: buffer.toString().length,
byteLength: buffer.byteLength
});
utils.assign(hit, {
data: buffer,
status: STATUS_DONE
});
self.cache.set(key, hit);
}
var buffer = Buffer.concat(emitters[key]._buffer);
c.metadata = c.metadata || {};
c.metadata.length = buffer.toString().length;
c.metadata.byteLength = buffer.byteLength;
c.data = buffer;
c.status = STATUS_DONE;
self.cache.set(key, c);
emitters[key].emit('end', Buffer.concat(emitters[key]._buffer));
delete emitters[key];
self.emitters[key].emit('end', Buffer.concat(self.emitters[key]._buffer));
delete self.emitters[key];
});

@@ -239,2 +212,6 @@ return stream;

StreamingCache.prototype.reset = function () {
this.cache.reset();
};
module.exports = StreamingCache;

@@ -12,2 +12,3 @@ 'use strict';

this.readable = false;
this.complete = false;
this._object = new Buffer(0);

@@ -26,6 +27,4 @@ }

ReadStream.prototype.complete = false;
ReadStream.prototype._push = function (size) {
if (this.ended) {
if (this.ended || !this._object.length) {
return;

@@ -38,15 +37,12 @@ }

}
if (!size) {size = 24 * 1024;}
if (!this._object.length) {
return;
}
if (this._offset + size > this._object.length) {
size = this._object.length - this._offset;
}
size = size || 24 * 1024;
size = Math.min(size, this._object.length - this._offset);
if (this._offset < this._object.length) {
var offset = this._offset;
this.push(this._object.slice(this._offset, this._offset + size));
this._offset += size ;
this.push(this._object.slice(offset, (offset + size)));
}
};
ReadStream.prototype._read = function (size) {

@@ -53,0 +49,0 @@ this._push(size);

{
"name": "streaming-cache",
"version": "0.4.1",
"version": "0.5.0",
"description": "Cache and replay NodeJS streams",

@@ -19,2 +19,3 @@ "main": "index.js",

"linkedlist": "^1.0.1",
"lodash.assign": "^4.0.1",
"lru-cache": "^2.7.0"

@@ -21,0 +22,0 @@ },

@@ -80,3 +80,3 @@ /* jshint jasmine: true */

expect(data).toEqual(undefined);
expect(err).toEqual('cache miss');
expect(err).toEqual('Cache miss');
done();

@@ -83,0 +83,0 @@ }));

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc