Class: Distribot::Flow
- Inherits:
-
Object
- Object
- Distribot::Flow
- Defined in:
- lib/distribot/flow.rb
Overview
rubocop:disable ClassLength
Instance Attribute Summary collapse
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#created_at ⇒ Object
Returns the value of attribute created_at.
-
#data ⇒ Object
Returns the value of attribute data.
-
#finished_callback_queue ⇒ Object
Returns the value of attribute finished_callback_queue.
-
#id ⇒ Object
Returns the value of attribute id.
-
#phases ⇒ Object
Returns the value of attribute phases.
Class Method Summary collapse
Instance Method Summary collapse
- #add_phase(options = {}) ⇒ Object
- #add_transition(item) ⇒ Object
- #cancel! ⇒ Object
- #canceled? ⇒ Boolean
- #current_phase ⇒ Object
- #finished? ⇒ Boolean
-
#initialize(attrs = {}) ⇒ Flow
constructor
A new instance of Flow.
- #next_phase ⇒ Object
- #pause! ⇒ Object
- #paused? ⇒ Boolean
- #phase(name) ⇒ Object
- #redis_id ⇒ Object
- #resume! ⇒ Object
- #running? ⇒ Boolean
-
#save!(&block) ⇒ Object
rubocop:disable Metrics/AbcSize.
- #stubbornly(task, &block) ⇒ Object
- #transition_to!(phase) ⇒ Object
- #transitions ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Flow
Returns a new instance of Flow.
10 11 12 13 14 15 16 17 18 |
# File 'lib/distribot/flow.rb', line 10 def initialize(attrs = {}) self.id = attrs[:id] self.created_at = attrs[:created_at] unless attrs[:created_at].nil? self.phases = [] (attrs[:phases] || []).each do || add_phase() end self.data = attrs[:data] end |
Instance Attribute Details
#consumer ⇒ Object
Returns the value of attribute consumer.
8 9 10 |
# File 'lib/distribot/flow.rb', line 8 def consumer @consumer end |
#created_at ⇒ Object
Returns the value of attribute created_at.
8 9 10 |
# File 'lib/distribot/flow.rb', line 8 def created_at @created_at end |
#data ⇒ Object
Returns the value of attribute data.
8 9 10 |
# File 'lib/distribot/flow.rb', line 8 def data @data end |
#finished_callback_queue ⇒ Object
Returns the value of attribute finished_callback_queue.
8 9 10 |
# File 'lib/distribot/flow.rb', line 8 def finished_callback_queue @finished_callback_queue end |
#id ⇒ Object
Returns the value of attribute id.
8 9 10 |
# File 'lib/distribot/flow.rb', line 8 def id @id end |
#phases ⇒ Object
Returns the value of attribute phases.
8 9 10 |
# File 'lib/distribot/flow.rb', line 8 def phases @phases end |
Class Method Details
.active ⇒ Object
20 21 22 23 24 |
# File 'lib/distribot/flow.rb', line 20 def self.active redis.smembers('distribot.flows.active').map do |id| self.find(id) end end |
.create!(attrs = {}) ⇒ Object
26 27 28 |
# File 'lib/distribot/flow.rb', line 26 def self.create!(attrs = {}) new(attrs).save! end |
Instance Method Details
#add_phase(options = {}) ⇒ Object
62 63 64 |
# File 'lib/distribot/flow.rb', line 62 def add_phase( = {}) phases << Phase.new() end |
#add_transition(item) ⇒ Object
126 127 128 |
# File 'lib/distribot/flow.rb', line 126 def add_transition(item) redis.sadd(redis_id + ':transitions', item.to_json) end |
#cancel! ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/distribot/flow.rb', line 94 def cancel! fail NotRunningError, 'Cannot cancel unless running' unless running? add_transition( from: current_phase, to: 'canceled', timestamp: Time.now.utc.to_f ) redis.decr 'distribot.flows.running' redis.srem 'distribot.flows.active', id end |
#canceled? ⇒ Boolean
103 104 105 |
# File 'lib/distribot/flow.rb', line 103 def canceled? current_phase == 'canceled' end |
#current_phase ⇒ Object
136 137 138 139 140 141 142 143 |
# File 'lib/distribot/flow.rb', line 136 def current_phase latest_transition = transitions.last if latest_transition latest_transition.to else phases.find(&:is_initial).name end end |
#finished? ⇒ Boolean
150 151 152 |
# File 'lib/distribot/flow.rb', line 150 def finished? phase(transitions.last.to).is_final end |
#next_phase ⇒ Object
145 146 147 148 |
# File 'lib/distribot/flow.rb', line 145 def next_phase current = current_phase phases.find { |x| x.name == current }.transitions_to end |
#pause! ⇒ Object
70 71 72 73 74 75 76 77 |
# File 'lib/distribot/flow.rb', line 70 def pause! fail NotRunningError, 'Cannot pause unless running' unless running? add_transition( from: current_phase, to: 'paused', timestamp: Time.now.utc.to_f ) end |
#paused? ⇒ Boolean
90 91 92 |
# File 'lib/distribot/flow.rb', line 90 def paused? current_phase == 'paused' end |
#phase(name) ⇒ Object
66 67 68 |
# File 'lib/distribot/flow.rb', line 66 def phase(name) phases.find { |x| x.name == name } end |
#redis_id ⇒ Object
111 112 113 |
# File 'lib/distribot/flow.rb', line 111 def redis_id @redis_id ||= Distribot.redis_id('flow', id) end |
#resume! ⇒ Object
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/distribot/flow.rb', line 79 def resume! fail NotPausedError, 'Cannot resume unless paused' unless paused? # Find the last transition before we were paused: prev_phase = transitions.reverse.find { |x| x.to != 'paused' } # Back to where we once belonged add_transition( from: 'paused', to: prev_phase.to, timestamp: Time.now.utc.to_f ) end |
#running? ⇒ Boolean
107 108 109 |
# File 'lib/distribot/flow.rb', line 107 def running? ! (paused? || canceled? || finished?) end |
#save!(&block) ⇒ Object
rubocop:disable Metrics/AbcSize
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/distribot/flow.rb', line 31 def save!(&block) fail StandardError, 'Cannot re-save a flow' if id self.id = SecureRandom.uuid record_id = redis_id + ':definition' self.created_at = Time.now.to_f # Actually save the record: redis.set record_id, serialize # Transition into the first phase: add_transition to: current_phase, timestamp: Time.now.utc.to_f # Add our id to the list of active flows: redis.sadd 'distribot.flows.active', id redis.incr('distribot.flows.running') # Announce our arrival to the rest of the system: Distribot.publish! 'distribot.flow.created', flow_id: id wait_for_flow_to_finish(block) if block_given? self end |
#stubbornly(task, &block) ⇒ Object
154 155 156 157 158 159 160 161 162 163 |
# File 'lib/distribot/flow.rb', line 154 def stubbornly(task, &block) loop do begin return block.call rescue NoMethodError => e warn "Error during #{task}: #{e} --- #{e.backtrace.join("\n")}" sleep 1 end end end |
#transition_to!(phase) ⇒ Object
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/distribot/flow.rb', line 115 def transition_to!(phase) previous_transition = transitions.last prev = previous_transition ? previous_transition[:to] : nil add_transition(from: prev, to: phase, timestamp: Time.now.utc.to_f) Distribot.publish!( 'distribot.flow.phase.started', flow_id: id, phase: phase ) end |
#transitions ⇒ Object
130 131 132 133 134 |
# File 'lib/distribot/flow.rb', line 130 def transitions redis.smembers(redis_id + ':transitions').map do |item| OpenStruct.new JSON.parse(item, symbolize_names: true) end.sort_by(&:timestamp) end |