Class: Rworkflow::SidekiqFlow

Inherits:
Flow
  • Object
show all
Defined in:
lib/rworkflow/sidekiq_flow.rb

Constant Summary collapse

STATE_POLICY_GATED =
:gated
MAX_EXPECTED_DURATION =
4.hours

Constants inherited from Flow

Flow::REDIS_NS, Flow::STATES_FAILED, Flow::STATES_TERMINAL, Flow::STATE_FAILED, Flow::STATE_SUCCESSFUL, Flow::WORKFLOW_REGISTRY

Instance Attribute Summary

Attributes inherited from Flow

#id, #lifecycle

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Flow

all, #cleaned_up?, cleanup, #count, #counters, #created_at, #failed?, failure?, #fetch, #finish_time, #finished?, generate_id, #get, get_private_workflows, get_public_workflows, #get_state_cardinality, #incr, #list_objects, load, #log, #logger, #logging?, #logs, #metadata_string, #name, #name=, #public?, read_flow_class, register, registered?, registry, serializer, #set, #start, #start_time, #started?, #states_list, #successful?, terminal?, #terminate, #total_objects, #total_objects_failed, #total_objects_processed, #transition, unregister, #valid?

Constructor Details

#initialize(id) ⇒ SidekiqFlow

Returns a new instance of SidekiqFlow.



6
7
8
9
# File 'lib/rworkflow/sidekiq_flow.rb', line 6

def initialize(id)
  super(id)
  @open_gates = RedisRds::Set.new("#{@redis_key}__open_gates")
end

Class Method Details

.build_flow_mapObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/rworkflow/sidekiq_flow.rb', line 135

def build_flow_map
  flow_map = {}
  queues = SidekiqHelper.queue_sizes.keys
  queues.each do |queue_name|
    queue = Sidekiq::Queue.new(queue_name)
    queue.each do |job|
      klass = begin
        job.klass.constantize
      rescue NameError => _
        nil
      end

      if !klass.nil? && klass <= Rworkflow::Worker
        id = job.args.first
        state_name = job.args.second
        state_map = flow_map.fetch(id, {})
        state_map[state_name] = state_map.fetch(state_name, 0) + 1
        flow_map[id] = state_map
      end
    end
  end
  return flow_map
end

.cleanup_broken_flowsObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/rworkflow/sidekiq_flow.rb', line 104

def cleanup_broken_flows
  broken = []
  flows = self.all
  flows.each do |flow|
    if flow.valid?
      if flow.finished? && !flow.public?
        broken << [flow, 'finished']
      elsif !flow.started? && flow.created_at < 1.day.ago
        broken << [flow, 'never started']
      end
    else
      broken << [flow, 'invalid']
    end
  end

  broken.each do |flow_pair|
    flow_pair.first.cleanup
    puts "Cleaned up #{flow_pair.second} flow #{flow_pair.first.id}"
  end
  puts ">>> Cleaned up #{broken.size} broken flows <<<"
end

.create(lifecycle, name = '', options) ⇒ Object



99
100
101
102
# File 'lib/rworkflow/sidekiq_flow.rb', line 99

def create(lifecycle, name = '', options)
  workflow = super(lifecycle, name, options)
  return workflow
end

.create_missing_jobs(flow, state_map) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/rworkflow/sidekiq_flow.rb', line 159

def create_missing_jobs(flow, state_map)
  counters = flow.counters
  counters.each do |state, num_objects|
    next if flow.class.terminal?(state) || state == :processing
    enqueued = state_map.fetch(state, 0) * flow.get_state_cardinality(state)
    missing = num_objects - enqueued
    if missing > 0
      flow.create_jobs(state, missing)
      puts "Created #{missing} missing jobs for state #{state} in flow #{flow.id}"
    end
  end
end

.enqueue_missing_jobsObject



126
127
128
129
130
131
132
133
# File 'lib/rworkflow/sidekiq_flow.rb', line 126

def enqueue_missing_jobs
  queued_flow_map = build_flow_map
  running_flows = self.all.select { |f| f.valid? && !f.finished? && !f.paused? }
  running_flows.each do |flow|
    state_map = queued_flow_map.fetch(flow.id, {})
    create_missing_jobs(flow, state_map)
  end
end

Instance Method Details

#cleanupObject



11
12
13
14
# File 'lib/rworkflow/sidekiq_flow.rb', line 11

def cleanup
  super()
  @open_gates.delete
end

#close_gate(state_name) ⇒ Object



94
95
96
# File 'lib/rworkflow/sidekiq_flow.rb', line 94

def close_gate(state_name)
  @open_gates.remove(state_name)
end

#continueObject

for now assumes



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rworkflow/sidekiq_flow.rb', line 44

def continue
  return if self.finished? || !self.valid? || !self.paused?
  if @flow_data.decr(:paused) == 0
    workers = Hash[self.counters.select { |name, _| !self.class.terminal?(name) && name != :processing }]

    # enqueue jobs
    workers.each { |worker, num_objects| create_jobs(worker, num_objects) }
  end
rescue StandardError => e
  Rails.logger.error("Error continuing flow #{self.id}: #{e.message}")
end

#create_jobs(state_name, num_objects) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/rworkflow/sidekiq_flow.rb', line 56

def create_jobs(state_name, num_objects)
  return if paused? || num_objects < 1 || self.class.terminal?(state_name) || gated?(state_name)
  state = @lifecycle.states[state_name]
  worker_class = if state.respond_to?(:worker_class)
    state.worker_class
  else
    begin
      state_name.constantize
    rescue NameError => _
      Rails.logger.error("Trying to push to a non existent worker class #{state_name} in workflow #{@id}")
      nil
    end
  end

  if !worker_class.nil?
    cardinality = get_state_cardinality(state_name)

    amount = if state.policy == State::STATE_POLICY_WAIT
      ((num_objects + get_state_list(state_name).size) / cardinality.to_f).floor
    else
      (num_objects / cardinality.to_f).ceil
    end

    amount.times { worker_class.enqueue_job(@id, state_name) }
  end
end

#expected_durationObject



24
25
26
# File 'lib/rworkflow/sidekiq_flow.rb', line 24

def expected_duration
  return MAX_EXPECTED_DURATION
end

#gated?(state_name) ⇒ Boolean

Returns:

  • (Boolean)


83
84
85
86
# File 'lib/rworkflow/sidekiq_flow.rb', line 83

def gated?(state_name)
  state = @lifecycle.states[state_name]
  return state.policy == STATE_POLICY_GATED && !@open_gates.include?(state_name)
end

#open_gate(state_name) ⇒ Object



88
89
90
91
92
# File 'lib/rworkflow/sidekiq_flow.rb', line 88

def open_gate(state_name)
  @open_gates.add(state_name)
  num_objects = count(state_name)
  create_jobs(state_name, num_objects)
end

#pauseObject



36
37
38
39
40
41
# File 'lib/rworkflow/sidekiq_flow.rb', line 36

def pause
  return if self.finished?
  @flow_data.incr(:paused)
rescue StandardError => e
  Rails.logger.error("Error pausing flow #{self.id}: #{e.message}")
end

#paused?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/rworkflow/sidekiq_flow.rb', line 28

def paused?
  return @flow_data.get(:paused).to_i > 0
end

#push(objects, name) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/rworkflow/sidekiq_flow.rb', line 16

def push(objects, name)
  pushed = 0
  pushed = super(objects, name)
ensure
  create_jobs(name, pushed) if pushed > 0
  return pushed
end

#statusObject



32
33
34
# File 'lib/rworkflow/sidekiq_flow.rb', line 32

def status
  return paused? ? 'Paused' : super()
end