
Security News
Axios Maintainer Confirms Social Engineering Attack Behind npm Compromise
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.
A library providing an expression language to compose and evaluate stream processing
The real-value library is an expression language for composing flows of time series based data
A stream is a set of values available through time. For the purpose of this document a stream is a common separated set of values.
//logical stream of values
,1,,,,2,,3
It would be convenient to have a from operator that can reify a stream.
from(',1,,,,2,,3')
Streams can implicitly represent time.
from(',1,0,,',{columns: 'status}') //logically expands to this '{time:0, status:null},{time: 2, status:1},{time:3,status:0},{time: 4,status:null}
Streams can explicitly represent time.
from('1100,2017\n1200,2018,\n1000,2019',{columns: ['production','year']}) //logically expands to this '{year:2017, production: 1100},{year: 2018, production:1200},{year:2019,production:1000}
Streams can process deal with multiple values concurrenly.
from('EX132,6050,51.6\nEX709,6060,65.2',{columns: ['EquipmentId','Model','SMU']})
Streams should be constructable from csv string, arrays, sql tables, iterators, generators. In the CoatesHire solution we could express ignition status values arriving into the system as:
from(coateshireGenerator)
The values may flow through the stream with no time lag.
,1,,,,2,,3 -=> ,1,,,,2,,3
An log operator can be used to view a stream.
from(',1,,,,2,,3').log()
The values may be delayed through a stream
,1,,,,2,,3,, -=> ,,,1,,,,2,,3
A delay operator would acheive the above translation.
from(',1,,,,2,,3').delay(2).log
We could use to represent the activity of a truck moving a payload from mine face to processing conveyor belt.
from(',1,,,,2,,3').delay(transportTime)
The stream values may accumulate before propagating
,1,,,,2,,3,, -=> ,,,,,3,,3,,
An accumulateTo operator could acheive this accumulation.
from(',1,,,,2,,3').accumulateto(3).log
In EOS Premier a truck can accumulate some load from a digger before moving it to a processing stockpile.
from(',1,,,,2,,3').accumulateto(truckCapaciity)
In EOS Premier it would take a truck some time to return from the stockpile before it can be reloaded.
from(',1,,,,2,,3').compose(accumulateTo(truckCapaciity),delay(truckReturnTrip))
The values may need to be limited before propagating
,1,,,,2,,3,, -=> ,1,,,,1,1,1,1,1
A limitTo operator could facilitate splitting defined chunks
from(',1,,,,2,,3').limitTo(1).log
An Idemitsu (coal mine) conveyor system has a defined throughput.
from(stockpile).limitTo(1)
Streams may need to flow together.
,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3, -=> b:1,a:1,a:2:b:2,,,b:3,a:3
Note that the a:1 syntax indicate a key with a value
A merge operator could merge 2 streams.
from(',a:3,a:3,,,,,a:3,').merge(from('b:3,,b:2,,b:3,'))
In Coates Hire there are streams of asset data from Telematics Guru and from Manitou but this source is irrelevant to the UI.
from(',Asset111:on,Asset222:off,,').merge(from('Asset333:on,,Asset333:off,'))
As an alternative a from operation applied to a stream should create a merged stream.
from('1,3'{column: 'odd}).from('2,4',{column: 'event'}) //{odd:1,even:2},{odd:3,even:4},
Streams may need to diverge
b:1,a:1,a:2:b:2,,,b:3,a:3 => ,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3,
A split operator would split the stream and return an array of streams
from('b:1,a:1,a:2:b:2,,,b:3,a:3').split([filterAs,filterBs])
In EOS premier a digger may split material into waste and coal stockpiles.
from('waste:1,coal:1,coal:2,,,waste:3,coal:3').split([filterCoal,filterWaste])
A relative to split would be just to duplicate a stream into 2 other streams.
from('1,2,3').duplicate((stream1,stream2)=>stream.log(),stream2.log()).
Streams may need to accumulate into categories
,a:1,a:2,,c:2,,b:3 => { a: 4, b: 3, c:2 }
A table operator would acheive this
from(',a:1,a:2,,c:2,,,b:2,').table(accumulate)
For EOS Premier utilization was accumulated into assets
from(',E16:10,D10:20,,E15:10,,,E16:10,').table(accumulate)
The aggregation into a table should be capable of updating statistics
,a:1,a:3,,b:2,,b:4 => average:{ a: 2, b: 3 }
For Coates Hire we were interested to capture statistics across the type of asset
from(',Asset1:10,Asset11:20,,Asset12:10,,,Asset10:20,').map(mapToAssetType).table(movingAverage,'averageUtilizationByType')
Note that table should accumulate but also emits changes to the tables.
from('A:1,A:10,,B:2,B:2,C:3,').table(accumulate).log() //should generate A:1,A:10,B:2,/*no change*/,C:3
Streams should be named so they can be queried
from(',a:1,a:2,,,,,b:3,c:4').table(accumulate).name('acache')
from('a,a,a,a,').map((e)=>stream('cache').get(x)) //1,1,1,2
When there is a change to a stream it may be desirable to reprocess the content of a table stream. For instance getting new information about depreciation rates could require reprocessing a table stream representing the value of some assets.
from(somestream).table({propagate:change}) //only table changes are propagated.
from(somestream).table({propagate:all}) //any table change generates a stream of the entire table.
Streams may need to accumulate between values
,a:1,b:1,,b:0,,a:0 => { a: 5, b: 3 }
A delta operation might allow this to be expressed
from(',a:1,b:1,,b:0,,a:0').table(compose(delta,accumulate))
In Coates Hire utilization related to time periods between on/off events
from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))
Streams may need to be compared against targets
,1,2,,,3,2,,1 ; 0,2,2,2,2,2,2,2,0 => 0%,50%,100%,0%,0%,0%,150%,100%,0%,0,50%
The from operator handles an arbitrary number of named streams.
from([
from(',1,2,,,3,2,,1','actual'),
from('0,2,2,2,2,2,2,2,0','target')]
).map( ({actual,target})=>actual*100/target)
from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))
In EOS Premier there was a need to show up to date production versus target production during the course of an interval.
It should be possible to define a stream with a source to be re-used multiple times.
let identityStream = map(x=>x) //The identity processing stream
from(testStream).thru(identity) // Used to test the stream definition
from(productionStream2).thru(identity) //Used to process production
Streams should be usable to update functions used in other streams. A stream of market data is used to update a function which can be used to value an individual item.
let valuer = () => ({
let valuationModel = //the model
trainValue: (historicalSaleData)=>{//update model },
value: (equipment)=>valuationModel.value(equipment)
})
from(historicalSaleData).map(x=>valuer.train(x)) //We need to update a valuation model
from(onSaleData).map(x=>valuer.value(x)).table() //then we want to use the model
Young Guns is about scheduling resources to empty and load containers.
There are a stream of container to be unloaded from(port) Containers accumulate stuff .limitTo() Containers are transported delay(). Containers are unloaded splitTo()
There are streams of mine stopes from(mineplan) Stopes need to be cleared splitTo(diggerLoad).accumulateTo(truckCapacitiy) Trucks transport waste or mineral delay(travelTime)
If the flow language existed we could have exposed into the mine portal to allow expression of complex event logic.
Purchase = from('100,,,,,') PlannedMaintenance = from(',-10,-10,-10,-10) UnplannedMaintance = from(',,-5,-7,')
PlannedMaintenace.depreciate(depreciationrate).table(accumulate) UnplannedMaintenace.depreciate(depreciationrate).table(accumulate) merge(Purchase,PlannedMaintance,Unplanned).depreciate(depreciationrate).table(accumulate)
Equipment needs to be valued be composing assets information with depreciation information related to age, utilization , location, marketsupply
function ageValuer {assettype,age} // build linear interpol from asset type and age to a value
from('...',{columns:['assetType','assetAge','value']}).map(ageValuer)
function smuValuer {assetType,smu} //calculate variances of value based on utilization at age and then use deviation from mean utilization to adjust value
from('...',{columns:['assettype','assetSMU','value']}.map(smuvaluer)
function locationValue{assetType,location} //
from('...',{column:['assettype','location','value']}).map(deriveLocationDepreciation)
from('assets.csv').
npm install value-flow
yarn install value-flow
Kafka Stream Most.js reactive programming library View of flow using sankey diagram
FAQs
A library providing an expression language to compose and evaluate stream processing
We found that real-value demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.

Security News
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.