
Research
NPM targeted by malware campaign mimicking familiar library names
Socket uncovered npm malware campaign mimicking popular Node.js libraries and packages from other ecosystems; packages steal data and execute remote code.
dataflow-api
Advanced tools
JavaScript API for dataflow processing using the vega-dataflow reactive engine. Perform common database operations (sorting, filtering, aggregation, window calculations) over JavaScript objects. Build and compose transformation pipelines over streaming data.
If you use NPM, npm install dataflow-api
. Otherwise, download the latest release. You can also load directly from GitHub as a standalone library. AMD, CommonJS, and vanilla environments are supported. In vanilla, a df
global is exported:
<script src="https://vega.github.io/dataflow-api/dataflow-api.v0.min.js"></script>
<script>
var flow = df.dataflow([
df.aggregate()
.groupby(['category'])
.measure([df.count(), df.sum('amount').as('sum')])
]);
flow.insert([
{category: 'a', amount: 12},
{category: 'a', amount: 5},
{category: 'b', amount: 11}
]);
// [{category: 'a', count: 2, sum: 17}, {category: 'b', count: 1, sum: 11}]
console.log(flow.values());
</script>
A dataflow is a processing pipeline that consists of a sequence of data transforms. A dataflow can either be a standalone dataflow that allows data objects to be added or removed, or a derived dataflow that processes the output of an upstream flow. All dataflows are reactive: they automatically re-evaluate upon changes to input data or upstream flows.
# df.dataflow([source,] transforms) <>
Creates and returns a new dataflow. The required transforms parameter is an array of transform descriptions. To create a dataflow that accepts input data, the transforms array should be provided as the sole argument. To instead create a derived flow, the first argument should be a source dataflow which the new dataflow will consume.
Returns the output array of data objects for the dataflow. To avoid a potentially expensive data copy, the values array is the same instance used internally by the dataflow. Making modifications to the array or any contained data objects may corrupt the state of the dataflow, affecting future updates.
Inserts one or more input data objects into the dataflow. The input data to insert can either be a single data object or an array of objects. Upon insertion, the dataflow is automatically re-evaluated, potentially changing the output values. Note that derived dataflows do not support an insert method.
Removes one or more input data objects from the dataflow. The input data to remove can either be a single data object or an array of objects. The data to remove must have already been passed as input to the dataflow via the insert method; if not, the resulting behavior is undefined. Upon removal, the dataflow is automatically re-evaluated, potentially changing the output values. Note that derived dataflows do not support a remove method.
Adds a listener callback function that is invoked when the dataflow output values update. The callback is invoked within a setTimeout
call after dataflow execution completes. To subsequently remove the listener, use the off method.
The callback function is invoked with a single argument containing the array of output data values. To avoid a potentially expensive data copy, the values array is the same instance used internally by the dataflow. Making modifications to the array or any contained data objects may corrupt the state of the dataflow, affecting future updates.
dataflow.on(function(values) {
// this method is invoked when the output values update
// the values array is from the internal dataflow state and is *not* copied
// make a defensive copy if you wish to modify the array
console.log(values);
});
Removes a listener callback function that was added using the on method.
Transform operators that process data within a dataflow:
# df.aggregate([groupby, measure]) <>
Creates a new aggregate transform specification. The aggregate transform groups and summarizes an input data stream to produce a new output stream. Aggregate transforms can be used to compute counts, sums, averages and other descriptive statistics over groups of data objects. The optional arguments groupby and measure are shorthands for the corresponding parameter methods.
// Generate new data objects for each per-category count and amount sum
df.aggregate()
.groupby(['category'])
.measure([df.count().as('cnt'), df.sum('amount')])
// Identical specification using shorthand arguments
df.aggregate(['category'], [df.count().as('cnt'), df.sum('amount')])
// Identical specification using measure object notation
df.aggregate(['category'], [
{op: 'count', as: 'cnt'},
{op: 'sum', field: 'amount'}
])
Name | Type | Description |
---|---|---|
groupby | Array < Field > | The data fields to group by. If not specified, a single group containing all data objects will be used. |
measure | Array < Measure > | The aggregate measures to compute. If not specified, a single count aggregate is performed. The measures can use any supported [aggregate operation |
cross | Boolean | Indicates if the full cross-product of all groupby values should be included in the aggregate output (default false ). If true , all possible combinations of groupby field values will be considered and zero count groups will be generated and returned for combinations that do not occur in the data itself. Cross-product output act as if the drop parameter is false . In the case of streaming updates, the number of output groups will increase if new groupby field values are observed; all prior groups will be retained. This parameter can be useful for generating facets that include groups for all possible partitions of the data. |
drop | Boolean | Indicates if empty (zero count) groups should be dropped (default true ). When a data stream changes, aggregation groups may become empty. By default, the group is removed from the output. However, in some cases (such as histograms), one may wish to retain empty groups. |
Creates a new bin transform specification. The bin transform discretizes numeric values into a set of bins. A common use case is to create a histogram. The optional argument field is a shorthand for the corresponding parameter method.
// Bin the 'amount' field, up to a maximum of 30 bins
// Write the bin boundaries to the fields 'bin_start' and 'bin_end'
df.bin().field('amount').maxbins(30).as(['bin_start', 'bin_end'])
// Identical specification using shorthand arguments
df.bin('amount').maxbins(30).as(['bin_start', 'bin_end'])
Name | Type | Description |
---|---|---|
field | Field | Required. The data field to bin. |
extent | Array < Number > | A two-element array with the minimum and maximum values of the bin range. If unspecified, the extent is set to [min, max] of the observed data values. |
anchor | Number | A value in the binned domain at which to anchor the bins, shifting the bin boundaries if necessary to ensure that a boundary aligns with the anchor value. By default, the minimum bin extent value serves as the anchor. |
maxbins | Number | The maximum number of bins to create (default 20 ). |
base | Number | The number base to use for automatic bin determination (default 10 ). |
step | Number | An exact step size to use between bins. If provided, options such as maxbins will be ignored. |
steps | Array < Number > | An array of allowable step sizes to choose from. |
minstep | Number | The minimum allowed bin step size (default 0 ). |
divide | Array < Number > | Allowable bin step sub-divisions. The default value is [5, 2] , which indicates that for base 10 numbers (the default base) automatic bin determination can consider dividing bin step sizes by 5 and/or 2. |
nice | Boolean | If true (the default), attempts to make the bin boundaries use human-friendly boundaries, such as multiples of ten. |
as | Array < String > | The output field names at which to write the start and end bin values. The default is ["bin0", "bin1"] . |
# df.countpattern([field, pattern, case]) <>
Creates a new countpattern transform specification. The countpattern transform counts the number of occurrences of a text pattern, as defined by a regular expression. This transform will iterate through each data object and count all unique pattern matches found within the designated text field. The optional arguments field, pattern and case are shorthands for the corresponding parameter methods.
// Count all alphabetic substrings in the 'description' field
// This example maps all input text to lowercase.
df.countpattern().field('description').pattern(/[a-z]+/).case('lower')
// Identical specification using shorthand arguments
df.countpattern('description', /[a-z]+/, 'lower')
Name | Type | Description |
---|---|---|
field | Field | Required. The data field containing the text data. |
pattern | RegExp | A regular expression indicating the pattern to count. All unique pattern matches will be separately counted. The default value is [\\w\']+ , which will match sequences containing word characters and apostrophes, but no other characters. |
case | String | A lower- or upper-case transformation to apply prior to pattern matching. One of "lower" , "upper" or "mixed" (the default). |
stopwords | String | A regular expression defining a pattern of text to ignore. For example, the value `"(foo |
as | Array < String > | The output field names for the text pattern and occurrence count. The default is ["text", "count"] . |
Creates a new filter transform specification. The filter transform removes objects from a data stream based on a provided filter expression. The optional argument expr is a shorthand for the corresponding parameter method.
let predicate = df.expr(d => d.amount > 100).fields(['amount'])
// Remove data objects with 'amount' values <= 100
df.filter().expr(predicate)
// Identical specification using shorthand arguments
df.filter(predicate)
Name | Type | Description |
---|---|---|
expr | Expression | Required. A predicate function for filtering the data. If the expression evaluates to false , the data object will be filtered. |
Creates a new fold transform specification. The fold transform collapses (or “folds”) one or more data fields into two properties: a key property (containing the original data field name) and a value property (containing the data value). The fold transform is useful for mapping matrix or cross-tabulation data into a standardized format. This transform generates a new data stream in which each data object consists of the key and value properties as well as all the original fields of the corresponding input data object. The optional argument fields is a shorthand for the corresponding parameter method.
// Collapse the 'fieldA' and 'fieldB' fields into key-value pairs
// The output stream will contain twice as many data objects
df.fold().fields(['fieldA', 'fieldB'])
// Identical specification using shorthand arguments
df.fold(['fieldA', 'fieldB'])
Name | Type | Description |
---|---|---|
fields | Array < Field > | Required. An array of data fields indicating the properties to fold. |
as | Array < String > | The output field names for the key and value properties produced by the fold transform. The default is ["key", "value"] . |
Creates a new formula transform specification. The formula transform extends data objects with new values according to a calculation formula. The optional arguments as and expr are shorthands for the corresponding parameter methods.
let mag = df.expr(d => Math.sqrt(d.u * d.u + d.v * d.v)).fields(['u', 'v'])
// Extend each object with a 'magnitude' field defined by the given function
df.formula().as('magnitude').expr(mag)
// Identical specification using shorthand arguments
df.formula('magnitude', mag)
Name | Type | Description |
---|---|---|
expr | Expression | Required. The formula function for calculating derived values. |
as | String | Required. The output field at which to write the formula value. |
initonly | Boolean | If true , the formula is evaluated only when a data object is first observed. The formula values will not automatically update if data objects are modified. The default is false . |
# df.joinaggregate([groupby, measure]) <>
Creates a new joinaggregate transform specification. The joinaggregate transform extends the input data objects with aggregate values. Aggregation is performed and the results are then joined with the input data. This transform can be helpful for creating derived values that combine both raw data and aggregate calculations, such as percentages of group totals. The optional arguments groupby and measure are shorthands for the corresponding parameter methods.
The parameters for this transform are identical to the aggregate transform, but rather than creating new output objects, the results are written back to each of the input data objects. An equivalent result can be achieved using a window transform where the sliding window frame encompasses the entire group; however, the joinaggregate provides a more performant alternative for this special case.
// Extend each data object with per-category counts and sum(amount)
df.joinaggregate()
.groupby(['category'])
.measure([df.count().as('cnt'), df.sum('amount')])
// Identical specification using shorthand arguments
df.joinaggregate(['category'], [df.count().as('cnt'), df.sum('amount')])
// Identical specification using measure object notation
df.joinaggregate(['category'], [
{op: 'count', as: 'cnt'},
{op: 'sum', field: 'amount'}
])
Name | Type | Description |
---|---|---|
groupby | Array < Field > | The data fields to group by. If not specified, a single group containing all data objects will be used. |
measure | Array < Measure > | The aggregate measures to compute. If not specified, a single count aggregate is performed. The measures can use any supported [aggregate operation |
Creates a new project transform specification. The project transform performs a relational algebra projection operation. Thie transform produces a stream of new data objects that include one or more fields of the source stream, with the data fields optionally renamed. The optional argument fields is a shorthand for the corresponding parameter method.
// Project the 'amount' field to new objects with a single field named 'value'
df.project().fields([df.field('amount').as('value')])
// Identical specification using shorthand arguments
df.project([df.field('amount').as('value')])
// Identical specification using field object notation
df.project([{field: 'amount', as: 'value'}])
Name | Type | Description |
---|---|---|
fields | Array < Field > | The data fields that should be copied over in the projection. If unspecified, all fields will be copied using their existing names. |
Creates a new sample transform specification. The sample transform randomly samples a data stream to create a smaller stream. As input data objects are added and removed, the sampled values may change in first-in, first-out manner. This transform uses reservoir sampling to maintain a representative sample of the stream. The optional argument size is a shorthand for the corresponding parameter method.
// Collect a random sample of 500 data objects
df.sample().size(500)
// Identical specification using shorthand arguments
df.sample(500)
Name | Type | Description |
---|---|---|
size | Number | The maximum number of data objects to include in the sample. The default value is 1000 . |
Creates a new sort transform specification. This transform materializes all the objects in a data stream within a single array, allowing sorting by data field values. The optional argument compare is a shorthand for the corresponding parameter method.
// Sort in descending order by the 'amount' field
df.sort().compare('-amount')
// Identical specification using shorthand arguments
df.sort('-amount')
// Identical specification using comparator object notation
df.sort({fields: ['amount'], orders: ['descending']})
// Identical specification using an explicit comparator expression
df.sort(df.expr((a, b) => b.amount - a.amount).fields(['amount']))
Name | Type | Description |
---|---|---|
compare | Compare | A comparator for sorting data objects. |
# df.window([compare, frame, measure]) <>
Creates a new window transform specification. The window transform performs calculations over sorted groups of data objects. These calculations including ranking, lead/lag analysis, and aggregates such as running sums and averages. Calculated values are written back to the input data stream. The optional arguments compare, frame and measure are shorthands for the corresponding parameter methods.
df.window()
.compare('amount')
.frame([null, null])
.measure([df.rank(), df.sum('amount')])
.groupby(['category'])
// Identical specification using shorthand arguments
df.window('amount', [null, null], [df.rank(), df.sum('amount')])
.groupby(['category'])
// Identical specification using measure object notation
df.window('amount', [null, null], [
{op: 'rank'},
{op: 'sum', field: 'amount'}
]).groupby(['category'])
Name | Type | Description |
---|---|---|
compare | Compare | A comparator for sorting data objects within a window. If two data objects are considered equal by the comparator, they are considered "peer" values of equal rank. If compare is not specified, the order is undefined: data objects are processed in the order they are observed and none are considered peers (the ignorePeers parameter is ignored and treated as if set to true ). |
groupby | Array < Field > | The data fields by which to partition data objects into separate windows. If not specified, a single group containing all data objects will be used. |
measure | Array < Measure > | The window measures to compute. The measures can use any supported [aggregate operation |
frame | Array < Number > | A frame specification as a two-element array indicating how the sliding window should proceed. The array entries should either be a number indicating the offset from the current data object, or null to indicate unbounded rows preceding or following the current data object. The default value is [null, 0] , indicating that the sliding window includes the current object and all preceding objects. The value [-5, 5] indicates that the window should include five objects preceding and five objects following the current object. Finally, [null, null] indicates that the window frame should always include all data objects. |
ignorePeers | Boolean | Indicates if the sliding window frame should ignore peer values. (Peer values are those considered identical by the compare criteria). The default is false , causing the window frame to expand to include all peer values. If set to true , the window frame will be defined by offset values only. This setting only affects those operations that depend on the window frame: aggregation operations and the first_value, last_value, and nth_value window operations. |
Parameter types for dataflow transforms:
# Array
An Array
instance representing a collection of values.
# Boolean
A Boolean
value. The values null
and ""
map to null
. The strings "false"
and "0"
map to false
. Any other values are subject to boolean coercion (!!value
).
# Compare
A comparator is a function that takes two arguments a and b as input and compares them to determine a rank ordering, return a value less than zero if a < b, a value greater than zero if a > b, and zero if the two values are equivalent. Comparators can be specified in multiple ways:
+
or -
prefix to indicate ascending or descending sort, respectively. If no prefix is supplied an ascending order is assumed. For example: "amount"
(implicit ascending order), "+amount"
(explicit ascending order), "-amount"
(descending order).["-amount", "+age"]
.fields
and orders
properties providing an array of data fields to order by and an array of corresponding orders ("ascending"
or "descending"
). For example: {fields: ["amount, age"], "orders: ["descending", "ascending"]}
.df.expr((a, b) => (b.amount - a.amount) || (a.age - b.age)).fields(['amount', 'age'])
.# Expression
An expression is a function that takes one or more data objects as arguments and returns a calculated value. Expressions are useful as filtering predicates and formula calculations, or to provide customized comparators.
Expressions should be constructed using the expr
API: df.expr(datum => datum.x * datum.x + datum.y * datum.y).fields(['x', 'y'])
.
# Field
A field is a named data attribute (or in tabular terms, a data column). These fields correspond to possibly nested properties of a data object. Field references can be specified in multiple ways:
df.field("field").as("name")
, indicating the string field name (or an accessor function) and optional name alias for output..
) or bracket ([]
) notation. For example: "amount"
, "source.x"
, "target['x']"
. To specify field names that contain dots but are not nested lookups, escape the dot inline ("my\\.field"
) or enclose the field name in brackets ("[my.field]"
).field
property and optional as
property. The field
property should be either a string field name or an expression function. The optional as
property specifies a name for the field, and can be used to specify the output names for a project transform or aggregate groupby. If as
is not specified, a given field
name string will be used.df.expr(d => Math.sqrt(d.amount)).fields(['amount']).as('sqrt_amount')
.# Measure
A measure is a window or aggregate operation to apply across a collection of data values. Measures can be specified in multiple ways:
df.sum('amount')
, df.ntile(4)
.{"op": "sum", "field": "amount"}
, {"op": "ntile", "param": 4}
. The supported object properties are:
op
: the window or aggregate operation name. This property is required in all cases.field
: a data field reference. This property is required for all aggregate operations and for window operations that operate over data field values.param
: an operation parameter. Applicable only to a subset of window operations.as
: output field name. Optional property to specify the output field.# Number
A Number
value. The values null
and ""
map to null
. Any other values are subject to number coercion (+value
).
# RegExp
A RegExp
value representing a well-formatted regular expression. The values null
and ""
map to null
. A RegExp
value is used as-is. Any other values are subject to string coercion (value+''
) and then interpreted as properly-escaped regular expression strings.
# String
A String
value. The values null
and ""
map to null
. Any other values are subject to string coercion (value + ''
).
Aggregate operations that can be used as entries of the measure parameter of the aggregate, joinaggregate, and window transforms. For each operation, the as method call is optional.
The total count of data objects in an aggregation group.
The count of field values that are not null
, undefined
, or NaN
.
# df.missing(field).as(name) <>
The count of null
or undefined
field values.
# df.distinct(field).as(name) <>
The count of distinct field values.
The sum of field values.
The mean (average) field value.
# df.average(field).as(name) <>
The mean (average) field value. Identical to mean.
# df.variance(field).as(name) <>
The sample variance of field values.
# df.variancep(field).as(name) <>
The population variance of field values.
The sample standard deviation of field values.
# df.stdevp(field).as(name) <>
The population standard deviation of field values.
# df.stderr(field).as(name) <>
The standard error of field values.
# df.median(field).as(name) <>
The median field value.
The lower quartile boundary of field values.
The upper quartile boundary of field values.
The lower boundary of the bootstrapped 95% confidence interval of the mean field value.
The upper boundary of the bootstrapped 95% confidence interval of the mean field value.
The minimum field value.
The maximum field value.
# df.argmin(field).as(name) <>
An input data object containing the minimum field value.
# df.argmax(field).as(name) <>
An input data object containing the maximum field value.
Window operations that can be used as entries of the measure parameter of the window transform. For each operation, the as method call is optional.
Assigns each data object a consecutive row number, starting from 1.
Assigns a rank order value to each data object in a window, starting from 1. Peer values are assigned the same rank. Subsequent rank scores incorporate the number of prior values. For example, if the first two values tie for rank 1, the third value is assigned rank 3.
Assigns dense rank order values to each data object in a window, starting from 1. Peer values are assigned the same rank. Subsequent rank scores do not incorporate the number of prior values. For example, if the first two values tie for rank 1, the third value is assigned rank 2.
# df.percent_rank().as(name) <>
Assigns a percentage rank order value to each data object in a window. The percent is calculated as (rank - 1) / (group_size - 1).
Assigns a cumulative distribution value between 0 and 1 to each data object in a window.
# df.ntile(parameter).as(name) <>
Assigns a quantile (e.g., percentile) value to each data object in a window. Accepts an integer parameter indicating the number of buckets to use (e.g., 100 for percentiles, 5 for quintiles).
# df.lag(field[, parameter]).as(name) <>
Assigns the value of field from the data object that precedes the current object by a specified number of positions. If no such object exists, assigns null
. Accepts an offset parameter (default 1
) that indicates the number of positions.
# df.lead(field[, parameter]).as(name) <>
Assigns the value of field from the data object that follows the current object by a specified number of positions. If no such object exists, assigns null
. Accepts an offset parameter (default 1
) that indicates the number of positions.
# df.first_value(field).as(name) <>
Assigns the value of field from the first data object in the current sliding window frame.
# df.last_value(field).as(name) <>
Assigns the value of field from the last data object in the current sliding window frame.
# df.nth_value(field[, parameter]).as(name) <>
Assigns the value of field from the nth data object in the current sliding window frame. If no such object exists, assigns null
. Requires a non-negative integer parameter that indicates the offset from the start of the window frame.
FAQs
JavaScript API for dataflow processing
The npm package dataflow-api receives a total of 5 weekly downloads. As such, dataflow-api popularity was classified as not popular.
We found that dataflow-api demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Socket uncovered npm malware campaign mimicking popular Node.js libraries and packages from other ecosystems; packages steal data and execute remote code.
Research
Socket's research uncovers three dangerous Go modules that contain obfuscated disk-wiping malware, threatening complete data loss.
Research
Socket uncovers malicious packages on PyPI using Gmail's SMTP protocol for command and control (C2) to exfiltrate data and execute commands.