Comparing version 0.1.9 to 0.1.10
{ | ||
"name": "streamops", | ||
"version": "0.1.9", | ||
"version": "0.1.10", | ||
"main": "./src/main.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -116,3 +116,42 @@ module.exports = { | ||
}; | ||
} | ||
}, | ||
mergeAggregate: function(options = {}) { | ||
const { | ||
removeDuplicates = true, | ||
alwaysArray = true | ||
} = options; | ||
return function* (input) { | ||
this.result = this.result || {}; | ||
if (input !== undefined) { | ||
for (const [key, value] of Object.entries(input)) { | ||
if (!(key in this.result)) { | ||
this.result[key] = []; | ||
} | ||
if (Array.isArray(value)) { | ||
this.result[key].push(...value); | ||
} else { | ||
this.result[key].push(value); | ||
} | ||
} | ||
} | ||
let output = {}; | ||
for (const [key, value] of Object.entries(this.result)) { | ||
let processedValue = value; | ||
if (removeDuplicates) { | ||
processedValue = value.filter((v, i, self) => | ||
i === self.findIndex((t) => ( | ||
t && v && typeof t === 'object' && typeof v === 'object' | ||
? JSON.stringify(t) === JSON.stringify(v) | ||
: t === v | ||
)) | ||
); | ||
} | ||
output[key] = alwaysArray ? processedValue : (processedValue.length === 1 ? processedValue[0] : processedValue); | ||
} | ||
yield output; | ||
}; | ||
}, | ||
}; |
@@ -54,2 +54,7 @@ const operators = require('./operators'); | ||
mergeAggregate(ms) { | ||
this.pipeline.push(operators.mergeAggregate(ms)); | ||
return this; | ||
} | ||
distinct(equalityFn) { | ||
@@ -56,0 +61,0 @@ this.pipeline.push(operators.distinct(equalityFn)); |
74200
21
2537