ad-promise
Advanced tools
Comparing version 1.0.5 to 1.0.6
@@ -10,2 +10,4 @@ | ||
const User = require('../../models/user'); | ||
const maxPromises = require('limitpromises'); | ||
const maxPromiseConfig = require('../../configs/config.maxPromiseGroup'); | ||
@@ -31,34 +33,34 @@ const chunkItem = function(members, opts, self) { | ||
}; | ||
search.call(self, localOpts, (err, members) => { | ||
search.call(self, localOpts, async function onSearch(err, members){ | ||
if (err) { | ||
reject(err); | ||
} | ||
let usersResolved = []; | ||
for(index in members){ | ||
let member = members[index]; | ||
let resolvable = getResolvable(); | ||
usersResolved.push(resolvable); | ||
let usersResolved = maxPromises(member => { | ||
return new Promise( (resolve, reject) => { | ||
if(member){ | ||
if(!member.groupType){ | ||
let user = new User(pickAttributes(member, (opts || {}).attributes || defaultAttributes.user)); | ||
self.emit(user); | ||
users.push(user); | ||
resolve(user); | ||
} else { | ||
self.getUsersForGroup(opts, member.cn).then(nestedUsers => { | ||
users = [].concat(users,nestedUsers); | ||
resolve(); | ||
}, err => { | ||
reject(err); | ||
}); | ||
} | ||
} | ||
}); | ||
}, members, 2500, "getChunkItem"); | ||
if(!member.groupType){ | ||
let user = new User(pickAttributes(member, (opts || {}).attributes || defaultAttributes.user)); | ||
self.emit(user); | ||
users.push(user); | ||
resolvable.resolveProm(); | ||
} else { | ||
self.getUsersForGroup(opts, member.cn).then((nestedUsers) => { | ||
users = [].concat(users,nestedUsers); | ||
resolvable.resolveProm(); | ||
}, err => { | ||
reject(err); | ||
}); | ||
} | ||
} | ||
Promise.all(usersResolved.map(userResolved => {return userResolved.prom})).then(() => { | ||
try{ | ||
await Promise.all(usersResolved.map( userResolved => { return userResolved.promiseFunc })); | ||
resolve(users); | ||
}, err => { | ||
console.log(err); | ||
} catch(err) { | ||
reject(err); | ||
}); | ||
} | ||
}); | ||
@@ -68,10 +70,2 @@ }); | ||
const getResolvable = () => { | ||
let resolveProm; | ||
let prom = new Promise((resolve, reject) => { | ||
resolveProm = resolve; | ||
}); | ||
return { resolveProm, prom } | ||
} | ||
module.exports = chunkItem; |
// this module will give you the possibility to make add alot of promises, but it will make sure, that only a fixed number is open. | ||
// It was initially created to limit the number of promises making a tcp request (as windows only allows 5000 at once by default) | ||
const getLaunchArray = (PromiseWithTcpRequest, InputValues) => { | ||
// If you provide a key you can make sure that you limit all the operations of a similar type. For example name your Type 'TCP' and all the functons | ||
// calling with that key will use the same launch array. So if F1 calls it with 5000 Promises and F2 calls it with 5000 Promises the total limit will | ||
// still be intact and not doubled | ||
let currentPromiseArrays = {}; | ||
let currentPromiseMaxNumbers = {}; | ||
/** | ||
* Returns an Array of Objects that are used in the PromisesWithMaxAtOnce Function. | ||
* | ||
* @public | ||
* @param {Function} PromiseFunc Function that returns a Promise with one InputParameter that is used | ||
* @param {Array} InputValues Array with the Inputvalues. One promise for each entry is created | ||
* @param {Number} StartingIndex Every entry will have an index to later determin which promise of the array was resolved | ||
*/ | ||
const getLaunchArray = (PromiseFunc, InputValues, StartingIndex) => { | ||
// This function will return a launch Array. It takes a function that returns a promise and it's input Values as an array | ||
@@ -9,15 +25,33 @@ // The output is an array with each entry having 3 elements | ||
// launchPromise triggers the execution of promisewithTcpRequest | ||
// promiseWithTcpRequest The Input Promise with the correlating Inputvalue | ||
// promiseFunc The Input Promise with the correlating Inputvalue | ||
let launchArray = InputValues.map((InputValue, Index) => { | ||
var resolveLaunchPromise; | ||
var launchPromise = new Promise((resolve, reject) => { | ||
resolveLaunchPromise = resolve; | ||
let startingIndex = StartingIndex ? StartingIndex : 0; | ||
let launchArray = InputValues.map(function(InputValue, Index) { | ||
let obj = {}; | ||
let resLPromise; | ||
// Expose the resolve of the promise, so it can be called from outsite | ||
obj.launchPromise = new Promise((resolve, reject) => { | ||
resLPromise = resolve; | ||
}); | ||
// Add some logic to the resolvePromise | ||
obj.resolveLaunchPromise = () => { | ||
obj.isRunning = true; | ||
resLPromise(); | ||
} | ||
var promiseWithTcpRequest = new Promise((resolve, reject) => { | ||
launchPromise.then(() => { | ||
PromiseWithTcpRequest(InputValue).then((data) =>{ | ||
obj.isRunning = false; | ||
obj.isRejected = false; | ||
obj.isResolved = false; | ||
obj.index = startingIndex + Index; | ||
obj.promiseFunc = new Promise((resolve, reject) => { | ||
obj.launchPromise.then(() => { | ||
PromiseFunc(InputValue).then((data) =>{ | ||
obj.isRunning = false; | ||
obj.isResolved = true; | ||
resolve(data); | ||
}, (err) => { | ||
obj.isRunning = false; | ||
obj.isRejected = true; | ||
reject(err) | ||
@@ -27,3 +61,3 @@ }); | ||
}); | ||
return {resolveLaunchPromise, launchPromise, promiseWithTcpRequest}; | ||
return obj; | ||
}); | ||
@@ -34,29 +68,80 @@ | ||
const PromisesWithMaxAtOnce = (PromiseWithTcpRequest, InputValues, MaxAtOnce) => { | ||
/** | ||
* For the specified group, retrieve all of the users that belong to the group. | ||
* | ||
* @public | ||
* @param {Function} PromiseFunc Function that returns a Promise with one InputParameter that is used | ||
* @param {Array} InputValues Array with the Inputvalues. One promise for each entry is created | ||
* @param {Number} MaxAtOnce Number of Promises that can run at the same time | ||
* @param {String} TypeKey A Key that is set to group promises together. So e.g. you set the key to TCP no matter which function calls with that Key it wont exceed the maxAtOnce Promises | ||
*/ | ||
const PromisesWithMaxAtOnce = (PromiseFunc, InputValues, MaxAtOnce, TypeKey) => { | ||
// You can input any promise that should be limited by open at the same time | ||
// PromiseWithTcpRequest is a function that returns a promise and takes in an input value | ||
// PromiseFunc is a function that returns a promise and takes in an input value | ||
// InputValue is an Array of those InputValues | ||
// MaxAtOnce is the number of Promises maximum pending at the same time | ||
let startedPromises = 0; | ||
let launchArray = getLaunchArray(PromiseWithTcpRequest, InputValues); | ||
if(TypeKey){ | ||
currentPromiseArrays[TypeKey] = currentPromiseArrays[TypeKey] || []; | ||
MaxAtOnce = currentPromiseMaxNumbers[TypeKey] ? currentPromiseMaxNumbers[TypeKey] : MaxAtOnce; | ||
if(!currentPromiseMaxNumbers[TypeKey]) currentPromiseMaxNumbers[TypeKey] = MaxAtOnce; | ||
} | ||
let alreadyRunning = TypeKey ? (currentPromiseArrays[TypeKey] ): []; | ||
let runningPromises = getCountRunningPromises(alreadyRunning); | ||
let launchArray = getLaunchArray(PromiseFunc, InputValues, alreadyRunning.length); | ||
alreadyRunning = alreadyRunning.concat(launchArray); | ||
// Launch idex is the current index of the promise in the array that is beeing started; | ||
let launchIndex = getCountFinishedOrRunningPromises(alreadyRunning); | ||
// First start as much promises as are allowed at once (if there are less in the array than max allowed, start all of them) | ||
for(i=0; i<(MaxAtOnce<launchArray.length ? MaxAtOnce : launchArray.length); i++){ | ||
launchArray[i].resolveLaunchPromise(); | ||
startedPromises++; | ||
for(let i=launchIndex; runningPromises < MaxAtOnce && i < alreadyRunning.length; i++){ | ||
alreadyRunning[i].resolveLaunchPromise(); | ||
runningPromises = getCountRunningPromises(alreadyRunning); | ||
launchIndex = getCountFinishedOrRunningPromises(alreadyRunning); | ||
} | ||
// For each Promise that finishes start a new one until all are launched | ||
launchArray.map((Value, Index) => { | ||
Value.promiseWithTcpRequest.then(() => { | ||
if(startedPromises<launchArray.length){ | ||
launchArray[startedPromises].resolveLaunchPromise(); | ||
startedPromises++; | ||
} | ||
}); | ||
alreadyRunning.map((Value, Index) => { | ||
// Only map for indices bigger than the current launch index as everything smaller has already been launched; | ||
if(Index >= launchIndex -1 && launchIndex < alreadyRunning.length){ | ||
Value.promiseFunc.then(() => { | ||
if(launchIndex < alreadyRunning.length){ | ||
alreadyRunning[launchIndex].resolveLaunchPromise(); | ||
} | ||
runningPromises = getCountRunningPromises(alreadyRunning); | ||
launchIndex = getCountFinishedOrRunningPromises(alreadyRunning); | ||
}, err => { | ||
console.log("ERROR IN SERVICE.MAXPROMISESATONCE"); | ||
console.log(err); | ||
if(launchIndex<alreadyRunning.length){ | ||
alreadyRunning[runningPromises].resolveLaunchPromise(); | ||
} | ||
runningPromises = getCountRunningPromises(alreadyRunning); | ||
launchIndex = getCountFinishedOrRunningPromises(alreadyRunning); | ||
}); | ||
} | ||
}); | ||
if(TypeKey){ | ||
currentPromiseArrays[TypeKey] = alreadyRunning; | ||
} | ||
return launchArray; | ||
} | ||
function getCountRunningPromises(PromiseArray){ | ||
// | ||
return PromiseArray.filter(Entry => {return Entry.isRunning === true}).length; | ||
} | ||
function getCountFinishedOrRunningPromises(PromiseArray){ | ||
return PromiseArray.filter(Entry => {return Entry.isRunning || Entry.isResolved || Entry.isRejected}).length; | ||
} | ||
module.exports = PromisesWithMaxAtOnce; |
@@ -187,3 +187,7 @@ | ||
log.debug('Querying active directory (%s) with filter "%s" for %j', | ||
baseDN, truncateLogOutput(opts.filter), _.any(opts.attributes) ? opts.attributes : '[*]'); | ||
baseDN, | ||
truncateLogOutput(opts.filter), | ||
_.any(opts.attributes) ? opts.attributes : '[*]' | ||
); | ||
client.search(baseDN, getLdapOpts(opts), controls, function onSearch(err, res) { | ||
@@ -190,0 +194,0 @@ if (err) { |
@@ -9,2 +9,3 @@ var _ = require('underscore'); | ||
const defaultAttributes = require('../configs/config.defaultAttributes'); | ||
const maxPromises = require('limitpromises'); | ||
@@ -19,3 +20,3 @@ /** | ||
*/ | ||
function getUsersForGroup(opts, groupName, callback) { | ||
async function getUsersForGroup(opts, groupName, callback) { | ||
var self = this; | ||
@@ -34,52 +35,44 @@ return new Promise((resolve, reject) => { | ||
attributes: joinAttributes((opts || {}).attributes || defaultAttributes.group, ['member']) | ||
}), | ||
groupName, async function (err, group) { | ||
if (err) { | ||
if (callback) callback(err); | ||
reject(err); | ||
return; | ||
}), groupName).then(async function (group) { | ||
// Group not found | ||
if (!group) { | ||
if (callback) callback(null, group); | ||
resolve(group); | ||
return; | ||
} | ||
// If only one result found, encapsulate result into array. | ||
if (typeof (group.member) === 'string') { | ||
group.member = [group.member]; | ||
} | ||
/** | ||
* Breaks the large array into chucks of the specified size. | ||
* @param {Array} arr The array to break into chunks | ||
* @param {Number} chunkSize The size of each chunk. | ||
* @returns {Array} The resulting array containing each chunk | ||
*/ | ||
function chunk(arr, chunkSize) { | ||
var result = []; | ||
for (var index = 0, length = arr.length; index < length; index += chunkSize) { | ||
result.push(arr.slice(index, index + chunkSize)); | ||
} | ||
// Group not found | ||
if (!group) { | ||
if (callback) callback(null, group); | ||
resolve(group); | ||
return; | ||
} | ||
// If only one result found, encapsulate result into array. | ||
if (typeof (group.member) === 'string') { | ||
group.member = [group.member]; | ||
} | ||
/** | ||
* Breaks the large array into chucks of the specified size. | ||
* @param {Array} arr The array to break into chunks | ||
* @param {Number} chunkSize The size of each chunk. | ||
* @returns {Array} The resulting array containing each chunk | ||
*/ | ||
function chunk(arr, chunkSize) { | ||
var result = []; | ||
for (var index = 0, length = arr.length; index < length; index += chunkSize) { | ||
result.push(arr.slice(index, index + chunkSize)); | ||
} | ||
return (result); | ||
} | ||
// We need to break this into the default size queries so | ||
// we can have them running concurrently. | ||
var chunks = chunk(group.member || [], defaultPageSize); | ||
if (chunks.length > 1) { | ||
log.debug('Splitting %d member(s) of "%s" into %d parallel chunks', | ||
(group.member || []).length, groupName, chunks.length); | ||
} | ||
return (result); | ||
} | ||
const allChunksDone = []; | ||
// Chunks represent the cn for each user; | ||
for (index in chunks){ | ||
let thisChunkDone; | ||
allChunksDone.push(new Promise((resolve, reject) => {thisChunkDone = resolve;})); | ||
chunkItem(chunks[index], opts, self).then(members => { | ||
result = result.concat(members); | ||
thisChunkDone(); | ||
}, err => { | ||
// We need to break this into the default size queries so | ||
// we can have them running concurrently. | ||
var chunks = chunk(group.member || [], defaultPageSize); | ||
if (chunks.length > 1) { | ||
log.debug('Splitting %d member(s) of "%s" into %d parallel chunks', | ||
(group.member || []).length, groupName, chunks.length); | ||
} | ||
// Chunks represent the cn for each user; | ||
// We use the maxPromises Function which will limit the number of promises running at the same time. | ||
// This is necessary to avoid that the socket of the AD is in use and thus cannot be accessed | ||
let allChunks = maxPromises(Chunk => { | ||
return new Promise((resolve, reject) => { | ||
chunkItem(Chunk, opts, self).then(members => { | ||
resolve(result.concat(members)); | ||
}, err => { | ||
if(callback){ | ||
@@ -90,13 +83,27 @@ callback(err); | ||
}); | ||
}); | ||
}, chunks, 2500, "getUsersForGroup"); | ||
// Wait for all the chunks to be ready then send the result back; | ||
await Promise.all(allChunks.map(Chunk => { | ||
return Chunk.promiseFunc | ||
})).then(data => { | ||
if(callback){ | ||
if(data.length === 0){ | ||
callback(null, []); | ||
} else { | ||
callback(null, data[0]); | ||
} | ||
} | ||
await Promise.all(allChunksDone).then(data => { | ||
if(callback){ | ||
callback(null, result); | ||
} | ||
return resolve(result); | ||
}, err => { | ||
return reject(err); | ||
}); | ||
return data.length === 0 ? resolve([]) : resolve(data[0]); | ||
}, err => { | ||
return reject(err); | ||
}); | ||
}, err => { | ||
if (callback) callback(err); | ||
return reject(err); | ||
}); | ||
}); | ||
@@ -103,0 +110,0 @@ |
{ | ||
"author": "Relief Melone (relief.melone@gmail.com)", | ||
"name": "ad-promise", | ||
"version": "1.0.5", | ||
"version": "1.0.6", | ||
"description": "This is a fork of the gheeres node-activedirectory. This is still a work in progress and not stable. The goal to use promises instead of the async library which is causing some problems in the current version of node-activedirectory", | ||
@@ -24,3 +24,4 @@ "main": "index.js", | ||
"ldapjs": ">= 0.7.1", | ||
"underscore": ">= 1.4.3" | ||
"underscore": ">= 1.4.3", | ||
"limitpromises": ">=1.0.1" | ||
}, | ||
@@ -27,0 +28,0 @@ "repository": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
206947
74
4020
4
+ Addedlimitpromises@>=1.0.1
+ Addedlimitpromises@1.5.5(transitive)