Socket
Socket
Sign inDemoInstall

vk-call

Package Overview
Dependencies
57
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.0 to 2.1.0

lib/stream_utils.js

156

lib/persistent_longpoll.js
const EventEmitter = require('events');
const rp = require('request-promise');
const assign = require('object-assign');
const pull = require('pull-stream');
const debug = require('debug');
const streamUtils = require('./stream_utils');
const probe = streamUtils.probe;
const repeater = streamUtils.repeater;
const cacher = streamUtils.cacher;
const asyncMapRecover = streamUtils.asyncMapRecover;

@@ -10,2 +13,4 @@ const DEFAULT_TIMEOUT = 1000;

const LONGPOLL_TIMEOUT = 35000;
const ERROR_TYPE_UNKNOWN = 'error_unknown';

@@ -18,18 +23,2 @@ const ERROR_TYPE_TS = 'error_ts';

function probe(name) {
var log = debug(name)
return function (read) {
log('received read function, returning source function')
return function (abort, cb) {
log('source(%O' + (cb ? ',[' + (typeof cb) + ']' : '') + ')', abort)
read(abort, function next (err, data) {
log('sink(%O, ' + (!err ? ', %O' : '') + ')', err, data)
if (err) return cb(err)
cb(err, data)
})
}
}
}
function makeUrl(creds) {

@@ -40,87 +29,2 @@ return `${creds.server}?act=a_check&key=${creds.key}&ts=${creds.ts}&wait=25`;

/**
* Through stream that remembers first value it got
* and repeats it, propagates errors in both ways.
*/
function cacher() {
let cache = false;
return function(read) {
return function readable(end, cb) {
if (end === null && cache) {
cb(null, cache);
} else {
read(end, function (end, data) {
cache = data;
cb(end, data)
});
}
}
}
}
/**
* Simple through stream until it encounters
* an error from downstream. Than it asks handleError:
* if it returns true, than it will go for retry. Each consecutive retry
* will double timeout until next retry with max value as maxTimeout.
*
* After first successful retry timeout will become defaultTimeout.
*
* It will always propagate upwards errrors to downstream.
*/
function repeater(handleError, defaultTimeout, maxTimeout) {
let currentTimeout = defaultTimeout;
return function(read) {
return function readable(end, cb) {
let propogate = end !== null ? handleError(end) : true;
if (propogate) {
currentTimeout = defaultTimeout;
read(end, function (end, data) {
cb(end, data)
});
} else {
setTimeout(function() {
read(null, function (end, data) {
cb(end, data)
});
}, currentTimeout);
currentTimeout = Math.min(currentTimeout * 2, maxTimeout);
}
}
}
}
/**
* This is like asyncMap, but it receives a function that returns promise.
* And also it can recover from errors which was thrown by map function.
*
* After recieving an error, it will try to propagate this error upstream,
* and only if upstream returns an error back, it will propagate it downstream.
*/
function asyncMapRecover(map) {
return function(read) {
let mapping = false;
return function readable(end, cb) {
read(end, function (end, data) {
if (!end) {
mapping = map(data)
.then(function(data) {
mapping = false;
cb(null, data);
}).catch(function(err) {
mapping = false;
cb(err, null);
return true;
});
} else {
if (end === true && mapping) {
mapping.cancel();
}
cb(end, data);
}
});
}
}
}
/**
* This stream requests API for longpoll credentials.

@@ -145,4 +49,12 @@ */

return asyncMapRecover(function(creds) {
return http(makeUrl(assign({}, creds, mutable))).then(function(body) {
return http({
timeout: LONGPOLL_TIMEOUT,
url: makeUrl(assign({}, creds, mutable))
}).then(function(body) {
return JSON.parse(body);
}).catch((error) => {
return Promise.reject({
type: ERROR_TYPE_UNKNOWN,
error: error
});
}).then(function(response) {

@@ -159,7 +71,2 @@ switch (response.failed) {

}
}).catch((error) => {
return Promise.reject({
type: ERROR_TYPE_UNKNOWN,
error: error
});
});

@@ -177,20 +84,29 @@ });

module.exports = function init(api) {
const sink = new EventEmitter();
const drain = pull.drain(function(data) {
sink.emit('data', data);
});
pull(
function createPersistentLonpgollStream(api, rp, defaultTimeout = DEFAULT_TIMEOUT, maxTimeout = MAX_TIME_OUT) {
return pull(
pull.infinite(function() { return api; }),
repeater(isEnd, DEFAULT_TIMEOUT, MAX_TIME_OUT),
repeater(isEnd, defaultTimeout, maxTimeout),
probe('creds'),
credentialsStream,
cacher(),
repeater(isCredsError, DEFAULT_TIMEOUT, MAX_TIME_OUT),
probe('lp'),
repeater(isCredsError, defaultTimeout, maxTimeout),
longpollStream(rp, {}),
probe('data'),
drain
probe('data')
);
return { sink: sink, abort: function() { drain.abort(); } };
}
module.exports = {
createPersistentLonpgollStream: createPersistentLonpgollStream,
init(api, rp) {
const sink = new EventEmitter();
const drain = pull.drain(function(data) {
sink.emit('data', data);
});
pull(
createPersistentLonpgollStream(api, rp),
drain
);
return { sink: sink, abort: function() { drain.abort(); } };
}
}

@@ -5,3 +5,3 @@ var assign = require('object-assign');

var Chain = require('./chain').Chain;
var longpollInit = require('./persistent_longpoll');
var longpoll = require('./persistent_longpoll');

@@ -34,3 +34,3 @@ const DEFAULT_TIMEOUT = 5000;

}
return longpollInit(this);
return longpoll.init(this, rp);
},

@@ -37,0 +37,0 @@

{
"name": "vk-call",
"version": "2.0.0",
"version": "2.1.0",
"description": "node API library for VK social network",

@@ -39,6 +39,9 @@ "main": "index.js",

"chai-as-promised": "^5.2.0",
"mocha": "^2.4.5",
"mocha": "^3.5.3",
"sinon": "^1.17.3",
"sinon-chai": "^2.8.0"
},
"optionalDependencies": {
"bluebird": "^3.5.1"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc