Class: Patty::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/patty/base.rb

Instance Method Summary collapse

Constructor Details

#initialize(storage = nil) ⇒ Base

Returns a new instance of Base.



8
9
10
# File 'lib/patty/base.rb', line 8

def initialize(storage = nil)
  @storage = Patty::AggregatedStorage.new Patty::Storages::Riak.new(title)
end

Instance Method Details

#build_signature(datetime_string = '') ⇒ Object



24
25
26
# File 'lib/patty/base.rb', line 24

def build_signature(datetime_string = '')
  Patty::TimeSignature.new(datetime_string[0, 19]).align.measure measure
end

#emit(signature = nil, value = nil, marker = nil) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/patty/base.rb', line 28

def emit(signature = nil, value = nil, marker = nil)
  if signature.nil?
    signature = Patty::TimeSignature.from_datetime
  end

  current_value = fetch signature, marker

  if current_value.nil?
    current_value = value
  else
    current_value = reduce [current_value, value]
  end

  @storage.put signature, current_value, marker
end

#fetch(signature = nil, marker = nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/patty/base.rb', line 52

def fetch(signature = nil, marker = nil)
  current_value = @storage.get signature, marker

  if current_value.nil?
    data = signature.children.inject([]) do |acc, child|
      child_value = fetch(child, marker)
      acc << child_value
      acc
    end

    current_value = reduce data

    @storage.put signature, current_value, marker
  end

  current_value
end

#flowObject



16
17
18
# File 'lib/patty/base.rb', line 16

def flow
  'events'
end

#map(flow_title = '', signature = nil, record = nil) ⇒ Object



44
45
46
# File 'lib/patty/base.rb', line 44

def map(flow_title = '', signature = nil, record = nil)
  emit(signature, value, record['marker'])
end

#measureObject



20
21
22
# File 'lib/patty/base.rb', line 20

def measure
  :tminute
end

#reduce(data = []) ⇒ Object



48
49
50
# File 'lib/patty/base.rb', line 48

def reduce(data = [])
  data.inject(0){ |acc, item| acc += item }
end

#runObject



70
71
72
73
74
75
76
# File 'lib/patty/base.rb', line 70

def run
  transport = Waffle::Base.new eval("Waffle::Transports::#{Waffle::Config.transport.capitalize}").new

  transport.subscribe flow do |flow_title, event|
    map flow_title, build_signature(event['occured_at']), event
  end
end

#titleObject



12
13
14
# File 'lib/patty/base.rb', line 12

def title
  self.class.to_s.downcase
end