Class: Distribot::Flow

Inherits:
Object
  • Object
show all
Defined in:
lib/distribot/flow.rb

Overview

rubocop:disable ClassLength

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Flow

Returns a new instance of Flow.



10
11
12
13
14
15
16
17
18
# File 'lib/distribot/flow.rb', line 10

def initialize(attrs = {})
  self.id = attrs[:id]
  self.created_at = attrs[:created_at] unless attrs[:created_at].nil?
  self.phases = []
  (attrs[:phases] || []).each do |options|
    add_phase(options)
  end
  self.data = attrs[:data]
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



8
9
10
# File 'lib/distribot/flow.rb', line 8

def consumer
  @consumer
end

#created_atObject

Returns the value of attribute created_at.



8
9
10
# File 'lib/distribot/flow.rb', line 8

def created_at
  @created_at
end

#dataObject

Returns the value of attribute data.



8
9
10
# File 'lib/distribot/flow.rb', line 8

def data
  @data
end

#finished_callback_queueObject

Returns the value of attribute finished_callback_queue.



8
9
10
# File 'lib/distribot/flow.rb', line 8

def finished_callback_queue
  @finished_callback_queue
end

#idObject

Returns the value of attribute id.



8
9
10
# File 'lib/distribot/flow.rb', line 8

def id
  @id
end

#phasesObject

Returns the value of attribute phases.



8
9
10
# File 'lib/distribot/flow.rb', line 8

def phases
  @phases
end

Class Method Details

.activeObject



20
21
22
23
24
# File 'lib/distribot/flow.rb', line 20

def self.active
  redis.smembers('distribot.flows.active').map do |id|
    self.find(id)
  end
end

.create!(attrs = {}) ⇒ Object



26
27
28
# File 'lib/distribot/flow.rb', line 26

def self.create!(attrs = {})
  new(attrs).save!
end

.find(id) ⇒ Object



54
55
56
57
58
59
60
# File 'lib/distribot/flow.rb', line 54

def self.find(id)
  redis_id = Distribot.redis_id('flow', id)
  raw_json = redis.get("#{redis_id}:definition") || return
  new(
    JSON.parse(raw_json, symbolize_names: true)
  )
end

Instance Method Details

#add_phase(options = {}) ⇒ Object



62
63
64
# File 'lib/distribot/flow.rb', line 62

def add_phase(options = {})
  phases << Phase.new(options)
end

#add_transition(item) ⇒ Object



126
127
128
# File 'lib/distribot/flow.rb', line 126

def add_transition(item)
  redis.sadd(redis_id + ':transitions', item.to_json)
end

#cancel!Object



94
95
96
97
98
99
100
101
# File 'lib/distribot/flow.rb', line 94

def cancel!
  fail NotRunningError, 'Cannot cancel unless running' unless running?
  add_transition(
    from: current_phase, to: 'canceled', timestamp: Time.now.utc.to_f
  )
  redis.decr 'distribot.flows.running'
  redis.srem 'distribot.flows.active', id
end

#canceled?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/distribot/flow.rb', line 103

def canceled?
  current_phase == 'canceled'
end

#current_phaseObject



136
137
138
139
140
141
142
143
# File 'lib/distribot/flow.rb', line 136

def current_phase
  latest_transition = transitions.last
  if latest_transition
    latest_transition.to
  else
    phases.find(&:is_initial).name
  end
end

#finished?Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/distribot/flow.rb', line 150

def finished?
  phase(transitions.last.to).is_final
end

#next_phaseObject



145
146
147
148
# File 'lib/distribot/flow.rb', line 145

def next_phase
  current = current_phase
  phases.find { |x| x.name == current }.transitions_to
end

#pause!Object



70
71
72
73
74
75
76
77
# File 'lib/distribot/flow.rb', line 70

def pause!
  fail NotRunningError, 'Cannot pause unless running' unless running?
  add_transition(
    from: current_phase,
    to: 'paused',
    timestamp: Time.now.utc.to_f
  )
end

#paused?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/distribot/flow.rb', line 90

def paused?
  current_phase == 'paused'
end

#phase(name) ⇒ Object



66
67
68
# File 'lib/distribot/flow.rb', line 66

def phase(name)
  phases.find { |x| x.name == name }
end

#redis_idObject



111
112
113
# File 'lib/distribot/flow.rb', line 111

def redis_id
  @redis_id ||= Distribot.redis_id('flow', id)
end

#resume!Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/distribot/flow.rb', line 79

def resume!
  fail NotPausedError, 'Cannot resume unless paused' unless paused?

  # Find the last transition before we were paused:
  prev_phase = transitions.reverse.find { |x| x.to != 'paused' }
  # Back to where we once belonged
  add_transition(
    from: 'paused', to: prev_phase.to, timestamp: Time.now.utc.to_f
  )
end

#running?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/distribot/flow.rb', line 107

def running?
  ! (paused? || canceled? || finished?)
end

#save!(&block) ⇒ Object

rubocop:disable Metrics/AbcSize



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/distribot/flow.rb', line 31

def save!(&block)
  fail StandardError, 'Cannot re-save a flow' if id
  self.id = SecureRandom.uuid
  record_id = redis_id + ':definition'
  self.created_at = Time.now.to_f

  # Actually save the record:
  redis.set record_id, serialize

  # Transition into the first phase:
  add_transition to: current_phase, timestamp: Time.now.utc.to_f

  # Add our id to the list of active flows:
  redis.sadd 'distribot.flows.active', id
  redis.incr('distribot.flows.running')

  # Announce our arrival to the rest of the system:
  Distribot.publish! 'distribot.flow.created', flow_id: id

  wait_for_flow_to_finish(block) if block_given?
  self
end

#stubbornly(task, &block) ⇒ Object



154
155
156
157
158
159
160
161
162
163
# File 'lib/distribot/flow.rb', line 154

def stubbornly(task, &block)
  loop do
    begin
      return block.call
    rescue NoMethodError => e
      warn "Error during #{task}: #{e} --- #{e.backtrace.join("\n")}"
      sleep 1
    end
  end
end

#transition_to!(phase) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/distribot/flow.rb', line 115

def transition_to!(phase)
  previous_transition = transitions.last
  prev = previous_transition ? previous_transition[:to] : nil
  add_transition(from: prev, to: phase, timestamp: Time.now.utc.to_f)
  Distribot.publish!(
    'distribot.flow.phase.started',
    flow_id: id,
    phase: phase
  )
end

#transitionsObject



130
131
132
133
134
# File 'lib/distribot/flow.rb', line 130

def transitions
  redis.smembers(redis_id + ':transitions').map do |item|
    OpenStruct.new JSON.parse(item, symbolize_names: true)
  end.sort_by(&:timestamp)
end