Class: Rworkflow::SidekiqFlow
- Inherits:
-
Flow
- Object
- Flow
- Rworkflow::SidekiqFlow
show all
- Defined in:
- lib/rworkflow/sidekiq_flow.rb
Constant Summary
collapse
- STATE_POLICY_GATED =
:gated
- MAX_EXPECTED_DURATION =
4.hours
Constants inherited
from Flow
Flow::REDIS_NS, Flow::STATES_FAILED, Flow::STATES_TERMINAL, Flow::STATE_FAILED, Flow::STATE_SUCCESSFUL, Flow::WORKFLOW_REGISTRY
Instance Attribute Summary
Attributes inherited from Flow
#id, #lifecycle
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Flow
all, #cleaned_up?, cleanup, #count, #counters, #created_at, #failed?, failure?, #fetch, #finish_time, #finished?, generate_id, #get, get_private_workflows, get_public_workflows, #get_state_cardinality, #incr, #list_objects, load, #log, #logger, #logging?, #logs, #metadata_string, #name, #name=, #public?, read_flow_class, register, registered?, registry, serializer, #set, #start, #start_time, #started?, #states_list, #successful?, terminal?, #terminate, #total_objects, #total_objects_failed, #total_objects_processed, #transition, unregister, #valid?
Constructor Details
Returns a new instance of SidekiqFlow.
6
7
8
9
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 6
def initialize(id)
super(id)
@open_gates = RedisRds::Set.new("#{@redis_key}__open_gates")
end
|
Class Method Details
.build_flow_map ⇒ Object
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 135
def build_flow_map
flow_map = {}
queues = SidekiqHelper.queue_sizes.keys
queues.each do |queue_name|
queue = Sidekiq::Queue.new(queue_name)
queue.each do |job|
klass = begin
job.klass.constantize
rescue NameError => _
nil
end
if !klass.nil? && klass <= Rworkflow::Worker
id = job.args.first
state_name = job.args.second
state_map = flow_map.fetch(id, {})
state_map[state_name] = state_map.fetch(state_name, 0) + 1
flow_map[id] = state_map
end
end
end
return flow_map
end
|
.cleanup_broken_flows ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 104
def cleanup_broken_flows
broken = []
flows = self.all
flows.each do |flow|
if flow.valid?
if flow.finished? && !flow.public?
broken << [flow, 'finished']
elsif !flow.started? && flow.created_at < 1.day.ago
broken << [flow, 'never started']
end
else
broken << [flow, 'invalid']
end
end
broken.each do |flow_pair|
flow_pair.first.cleanup
puts "Cleaned up #{flow_pair.second} flow #{flow_pair.first.id}"
end
puts ">>> Cleaned up #{broken.size} broken flows <<<"
end
|
.create(lifecycle, name = '', options) ⇒ Object
99
100
101
102
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 99
def create(lifecycle, name = '', options)
workflow = super(lifecycle, name, options)
return workflow
end
|
.create_missing_jobs(flow, state_map) ⇒ Object
159
160
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 159
def create_missing_jobs(flow, state_map)
counters = flow.counters
counters.each do |state, num_objects|
next if flow.class.terminal?(state) || state == :processing
enqueued = state_map.fetch(state, 0) * flow.get_state_cardinality(state)
missing = num_objects - enqueued
if missing > 0
flow.create_jobs(state, missing)
puts "Created #{missing} missing jobs for state #{state} in flow #{flow.id}"
end
end
end
|
.enqueue_missing_jobs ⇒ Object
126
127
128
129
130
131
132
133
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 126
def enqueue_missing_jobs
queued_flow_map = build_flow_map
running_flows = self.all.select { |f| f.valid? && !f.finished? && !f.paused? }
running_flows.each do |flow|
state_map = queued_flow_map.fetch(flow.id, {})
create_missing_jobs(flow, state_map)
end
end
|
Instance Method Details
#cleanup ⇒ Object
11
12
13
14
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 11
def cleanup
super()
@open_gates.delete
end
|
#close_gate(state_name) ⇒ Object
94
95
96
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 94
def close_gate(state_name)
@open_gates.remove(state_name)
end
|
#continue ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 44
def continue
return if self.finished? || !self.valid? || !self.paused?
if @flow_data.decr(:paused) == 0
workers = Hash[self.counters.select { |name, _| !self.class.terminal?(name) && name != :processing }]
workers.each { |worker, num_objects| create_jobs(worker, num_objects) }
end
rescue StandardError => e
Rails.logger.error("Error continuing flow #{self.id}: #{e.message}")
end
|
#create_jobs(state_name, num_objects) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 56
def create_jobs(state_name, num_objects)
return if paused? || num_objects < 1 || self.class.terminal?(state_name) || gated?(state_name)
state = @lifecycle.states[state_name]
worker_class = if state.respond_to?(:worker_class)
state.worker_class
else
begin
state_name.constantize
rescue NameError => _
Rails.logger.error("Trying to push to a non existent worker class #{state_name} in workflow #{@id}")
nil
end
end
if !worker_class.nil?
cardinality = get_state_cardinality(state_name)
amount = if state.policy == State::STATE_POLICY_WAIT
((num_objects + get_state_list(state_name).size) / cardinality.to_f).floor
else
(num_objects / cardinality.to_f).ceil
end
amount.times { worker_class.enqueue_job(@id, state_name) }
end
end
|
#expected_duration ⇒ Object
24
25
26
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 24
def expected_duration
return MAX_EXPECTED_DURATION
end
|
#gated?(state_name) ⇒ Boolean
83
84
85
86
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 83
def gated?(state_name)
state = @lifecycle.states[state_name]
return state.policy == STATE_POLICY_GATED && !@open_gates.include?(state_name)
end
|
#open_gate(state_name) ⇒ Object
88
89
90
91
92
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 88
def open_gate(state_name)
@open_gates.add(state_name)
num_objects = count(state_name)
create_jobs(state_name, num_objects)
end
|
#pause ⇒ Object
36
37
38
39
40
41
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 36
def pause
return if self.finished?
@flow_data.incr(:paused)
rescue StandardError => e
Rails.logger.error("Error pausing flow #{self.id}: #{e.message}")
end
|
#paused? ⇒ Boolean
28
29
30
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 28
def paused?
return @flow_data.get(:paused).to_i > 0
end
|
#push(objects, name) ⇒ Object
16
17
18
19
20
21
22
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 16
def push(objects, name)
pushed = 0
pushed = super(objects, name)
ensure
create_jobs(name, pushed) if pushed > 0
return pushed
end
|
#status ⇒ Object
32
33
34
|
# File 'lib/rworkflow/sidekiq_flow.rb', line 32
def status
return paused? ? 'Paused' : super()
end
|