Class: Streamer::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/streamer/stream.rb

Overview

Stream is the object that is responsible for mutating the data passed to it

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hash) ⇒ Stream

Returns a new instance of Stream.



8
9
10
# File 'lib/streamer/stream.rb', line 8

def initialize(hash)
  @payload = hash
end

Instance Attribute Details

#finderObject

Returns the value of attribute finder.



7
8
9
# File 'lib/streamer/stream.rb', line 7

def finder
  @finder
end

#payloadObject (readonly)

Returns the value of attribute payload.



6
7
8
# File 'lib/streamer/stream.rb', line 6

def payload
  @payload
end

Instance Method Details

#assign(property:, value: nil, function: nil) ⇒ Object



16
17
18
19
20
21
22
23
24
# File 'lib/streamer/stream.rb', line 16

def assign(property:, value: nil, function: nil)
  assign_property(
    structure: payload,
    properties: property.to_s.split('.'),
    value: value,
    function: function
  )
  self
end

#assign_each(list:, property:, value: nil, function: nil) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/streamer/stream.rb', line 38

def assign_each(list:, property:, value: nil, function: nil)
  payload.dig(*list.split('.')).each do |item|
    item[property] = value unless value.nil?
    item[property] = functor(replace_terms(item, function)).call if function
  end
  self
end

#assign_property(structure:, properties:, value: nil, function: nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/streamer/stream.rb', line 26

def assign_property(structure:, properties:, value: nil, function: nil)
  properties.each_with_index do |prop, index|
    if index == properties.size - 1
      structure[prop] = value unless value.nil?
      structure[prop] = functor(function).call if function
    else
      structure[prop] = {} unless structure[prop]
      structure = structure[prop]
    end
  end
end

#filter(function:) ⇒ Object



12
13
14
# File 'lib/streamer/stream.rb', line 12

def filter(function:)
  assign(property: 'filter_value', value: functor(function).call)
end

#functor(options = {}, pl = payload) ⇒ Object



59
60
61
# File 'lib/streamer/stream.rb', line 59

def functor(options = {}, pl = payload)
  Streamer::Functors::Functor.new(pl, options)
end

#replace_terms(item, function_hash) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/streamer/stream.rb', line 46

def replace_terms(item, function_hash)
  newfunc = Marshal.load(Marshal.dump(function_hash))
  newfunc[:terms] = newfunc[:terms].map do |t|
    if t.is_a?(String) && t.start_with?('#')
      t[0] = '' # remove the '#'
      item.dig(*t.split('.'))
    else
      t
    end
  end
  newfunc
end