D★Stream

Gem version Build status Code Climate Code Coverage

D★Stream is a set of extensions for writing stream-processing code in Ruby.

CAUTION: D★Stream is work in progress, and pre-alpha quality.

Examples

Example 1: straightforward

The following example takes a sequence of events for a given ticket, and calculates the history for that ticket, using slowly changing dimensions:

events =
  Enumerator.new do |y|
    y << { id: 40562348, at: Time.now - 400, status: 'new' }
    y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
    y << { id: 40565795, at: Time.now - 250, priority: 'high' }
    y << { id: 40569932, at: Time.now - 100, status: 'solved' }
  end.lazy

S = DStream

indices = (1..(1.0 / 0.0))

history =
  S.apply(
    events,

    # calculate new state
    S.scan({}, &:merge),

    # add `version`
    S.zip(indices),
    S.map { |(e, i)| e.merge(version: i) },

    # remove `id`
    S.map { |e| e.reject { |k, _v| k == :id } },

    # add `valid_to` and `valid_from`, and remove `at`
    S.with_next,
    S.map { |(a, b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
    S.map { |e| e.merge(valid_from: e.fetch(:at)) },
    S.map { |e| e.reject { |k, _v| k == :at } },

    # add `row_is_current`
    S.with_next,
    S.map { |(a, b)| a.merge(row_is_current: b.nil?) },
  )

history.each { |e| p e }

The output is as follows:

{
  :status=>"new",
  :valid_from=>2017-05-05 20:18:14 +0200,
  :valid_to=>2017-05-05 20:19:54 +0200,
  :version=>1,
  :row_is_current=>false
}
{
  :status=>"new",
  :assignee_id=>2,
  :valid_from=>2017-05-05 20:19:54 +0200,
  :valid_to=>2017-05-05 20:20:44 +0200,
  :version=>2,
  :row_is_current=>false
}
{
  :status=>"new",
  :assignee_id=>2,
  :priority=>"high",
  :valid_from=>2017-05-05 20:20:44 +0200,
  :valid_to=>2017-05-05 20:23:14 +0200,
  :version=>3,
  :row_is_current=>false
}
{
  :status=>"solved",
  :assignee_id=>2,
  :priority=>"high",
  :valid_from=>2017-05-05 20:23:14 +0200,
  :valid_to=>nil,
  :version=>4,
  :row_is_current=>true
}

Example 2: better factored

This example is functionally identical to the one above, but uses S.compose in order to make the final process, history_builder, easier to understand.

events =
  Enumerator.new do |y|
    y << { id: 40562348, at: Time.now - 400, status: 'new' }
    y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
    y << { id: 40565795, at: Time.now - 250, priority: 'high' }
    y << { id: 40569932, at: Time.now - 100, status: 'solved' }
  end.lazy

S = DStream

merge =
  S.scan({}, &:merge),

indices = (1..(1.0 / 0.0))
add_version =
  S.compose(
    S.zip(indices),
    S.map { |(e,i)| e.merge(version: i) },
  )

remove_id =
  S.map { |e| e.reject { |k, _v| k == :id } }

add_valid_dates =
  S.compose(
    S.with_next,
    S.map { |(a,b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
    S.map { |e| e.merge(valid_from: e.fetch(:at)) },
    S.map { |e| e.reject { |k, _v| k == :at } },
  )

add_row_is_current =
  S.compose(
    S.with_next,
    S.map { |(a,b)| a.merge(row_is_current: b.nil?) },
  )

history_builder =
  S.compose(
    merge,
    add_version,
    remove_id,
    add_valid_dates,
    add_row_is_current,
  )

history = S.apply(events, history_builder)

history.each { |h| p h }

API

The following functions create individual processors:

  • map(&block) (similar to Enumerable#map)

    S.apply((1..5), S.map(&:odd?)).to_a
    # => [true, false, true, false, true]
    
  • select(&block) (similar to Enumerable#select)

    S.apply((1..5), S.select(&:odd?)).to_a
    # => [1, 3, 5]
    
  • reduce(&block) (similar to Enumerable#reduce)

    S.apply((1..5), S.reduce(&:+))
    # => 15
    
  • take(n) (similar to Enumerable#take)

    S.apply((1..10), S.take(3)).to_a
    # => [1, 2, 3]
    
  • zip(other) (similar to Enumerable#zip):

    S.apply((1..3), S.zip((10..13))).to_a
    # => [[1, 10], [2, 11], [3, 12]]
    
  • buffer(size) yields each stream element, but keeps an internal buffer of not-yet-yielded stream elements. This is useful when reading from a slow and bursty data source, such as a paginated HTTP API.

  • with_next yields an array containing the stream element and the next stream element, or nil when the end of the stream is reached:

    S.apply((1..5), S.with_next).to_a
    # => [[1, 2], [2, 3], [3, 4], [4, 5], [5, nil]]
    
  • scan(init, &block) is similar to reduce, but rather than returning a single aggregated value, returns all intermediate aggregated values:

    S.apply((1..5), S.scan(0, &:+)).to_a
    # => [1, 3, 6, 10, 15]
    
  • flatten2 yields the stream element if it is not an array, otherwise yields the stream element array’s contents:

    S.apply((1..5), S.with_next, S.flatten2).to_a
    # => [1, 2, 2, 3, 3, 4, 4, 5, 5, nil]
    

To apply one or more processors to a stream, use .apply:

S = DStream

stream = ['hi']

S.apply(stream, S.map(&:upcase)).to_a
# => ["HI"]

To combine one or more processors, use .compose:

S = DStream

stream = ['hi']

processor = S.compose(
  S.map(&:upcase),
  S.map(&:reverse),
)

S.apply(stream, processor).to_a
# => ["IH"]