D★Stream

D★Stream is a set of extensions for writing stream-processing code in Ruby.
CAUTION: D★Stream is work in progress, and pre-alpha quality.
Examples
Example 1: straightforward
The following example takes a sequence of events for a given ticket, and calculates the history for that ticket, using slowly changing dimensions:
events =
Enumerator.new do |y|
y << { id: 40562348, at: Time.now - 400, status: 'new' }
y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
y << { id: 40565795, at: Time.now - 250, priority: 'high' }
y << { id: 40569932, at: Time.now - 100, status: 'solved' }
end.lazy
S = DStream
indices = (1..(1.0 / 0.0))
history =
S.apply(
events,
S.scan({}, &:merge),
S.zip(indices),
S.map { |(e, i)| e.merge(version: i) },
S.map { |e| e.reject { |k, _v| k == :id } },
S.with_next,
S.map { |(a, b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
S.map { |e| e.merge(valid_from: e.fetch(:at)) },
S.map { |e| e.reject { |k, _v| k == :at } },
S.with_next,
S.map { |(a, b)| a.merge(row_is_current: b.nil?) },
)
history.each { |e| p e }
The output is as follows:
{
:status=>"new",
:valid_from=>2017-05-05 20:18:14 +0200,
:valid_to=>2017-05-05 20:19:54 +0200,
:version=>1,
:row_is_current=>false
}
{
:status=>"new",
:assignee_id=>2,
:valid_from=>2017-05-05 20:19:54 +0200,
:valid_to=>2017-05-05 20:20:44 +0200,
:version=>2,
:row_is_current=>false
}
{
:status=>"new",
:assignee_id=>2,
:priority=>"high",
:valid_from=>2017-05-05 20:20:44 +0200,
:valid_to=>2017-05-05 20:23:14 +0200,
:version=>3,
:row_is_current=>false
}
{
:status=>"solved",
:assignee_id=>2,
:priority=>"high",
:valid_from=>2017-05-05 20:23:14 +0200,
:valid_to=>nil,
:version=>4,
:row_is_current=>true
}
Example 2: better factored
This example is functionally identical to the one above, but uses S.compose
in order to make the final process, history_builder
, easier to understand.
events =
Enumerator.new do |y|
y << { id: 40562348, at: Time.now - 400, status: 'new' }
y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
y << { id: 40565795, at: Time.now - 250, priority: 'high' }
y << { id: 40569932, at: Time.now - 100, status: 'solved' }
end.lazy
S = DStream
merge =
S.scan({}, &:merge),
indices = (1..(1.0 / 0.0))
add_version =
S.compose(
S.zip(indices),
S.map { |(e,i)| e.merge(version: i) },
)
remove_id =
S.map { |e| e.reject { |k, _v| k == :id } }
add_valid_dates =
S.compose(
S.with_next,
S.map { |(a,b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
S.map { |e| e.merge(valid_from: e.fetch(:at)) },
S.map { |e| e.reject { |k, _v| k == :at } },
)
add_row_is_current =
S.compose(
S.with_next,
S.map { |(a,b)| a.merge(row_is_current: b.nil?) },
)
history_builder =
S.compose(
merge,
add_version,
remove_id,
add_valid_dates,
add_row_is_current,
)
history = S.apply(events, history_builder)
history.each { |h| p h }
API
The following functions create individual processors:
-
map(&block)
(similar to Enumerable#map
)
S.apply((1..5), S.map(&:odd?)).to_a
-
select(&block)
(similar to Enumerable#select
)
S.apply((1..5), S.select(&:odd?)).to_a
-
reduce(&block)
(similar to Enumerable#reduce
)
S.apply((1..5), S.reduce(&:+))
-
take(n)
(similar to Enumerable#take
)
S.apply((1..10), S.take(3)).to_a
-
zip(other)
(similar to Enumerable#zip
):
S.apply((1..3), S.zip((10..13))).to_a
-
buffer(size)
yields each stream element, but keeps an internal buffer of not-yet-yielded stream elements. This is useful when reading from a slow and bursty data source, such as a paginated HTTP API.
-
with_next
yields an array containing the stream element and the next stream element, or nil when the end of the stream is reached:
S.apply((1..5), S.with_next).to_a
-
scan(init, &block)
is similar to reduce
, but rather than returning a single aggregated value, returns all intermediate aggregated values:
S.apply((1..5), S.scan(0, &:+)).to_a
-
flatten2
yields the stream element if it is not an array, otherwise yields the stream element array’s contents:
S.apply((1..5), S.with_next, S.flatten2).to_a
To apply one or more processors to a stream, use .apply
:
S = DStream
stream = ['hi']
S.apply(stream, S.map(&:upcase)).to_a
To combine one or more processors, use .compose
:
S = DStream
stream = ['hi']
processor = S.compose(
S.map(&:upcase),
S.map(&:reverse),
)
S.apply(stream, processor).to_a