Class: Rworkflow::Flow

Inherits:
Object
  • Object
show all
Defined in:
lib/rworkflow/flow.rb

Direct Known Subclasses

SidekiqFlow

Constant Summary collapse

STATE_SUCCESSFUL =
:successful
STATE_FAILED =
:failed
STATES_TERMINAL =
[STATE_FAILED, STATE_SUCCESSFUL].freeze
STATES_FAILED =
[STATE_FAILED].freeze
REDIS_NS =
'flow'.freeze
WORKFLOW_REGISTRY =
"#{REDIS_NS}:__registry".freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id) ⇒ Flow

Returns a new instance of Flow.



14
15
16
17
18
19
20
21
22
23
# File 'lib/rworkflow/flow.rb', line 14

def initialize(id)
  @id = id
  @redis_key = "#{REDIS_NS}:#{id}"

  @storage = RedisRds::Hash.new(@redis_key)
  @flow_data = RedisRds::Hash.new("#{@redis_key}__data")
  @processing = RedisRds::Hash.new("#{@redis_key}__processing")

  load_lifecycle
end

Instance Attribute Details

#idObject

Returns the value of attribute id.



11
12
13
# File 'lib/rworkflow/flow.rb', line 11

def id
  @id
end

#lifecycleObject

Returns the value of attribute lifecycle.



12
13
14
# File 'lib/rworkflow/flow.rb', line 12

def lifecycle
  @lifecycle
end

Class Method Details

.all(options = {}) ⇒ Object



393
394
395
# File 'lib/rworkflow/flow.rb', line 393

def all(options = {})
  return registry.all(options.reverse_merge(parent_class: self)).map { |id| load(id) }
end

.cleanup(id) ⇒ Object



380
381
382
383
# File 'lib/rworkflow/flow.rb', line 380

def cleanup(id)
  workflow = new(id)
  workflow.cleanup
end

.create(lifecycle, name = '', options = {}) ⇒ Object



359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/rworkflow/flow.rb', line 359

def create(lifecycle, name = '', options = {})
  id = generate_id(name)
  workflow = new(id)
  workflow.name = name
  workflow.lifecycle = lifecycle
  workflow.set(:created_at, Time.now.to_i)
  workflow.set(:public, options.fetch(:public, false))
  workflow.set(:logging, options.fetch(:logging, true))

  register(workflow)

  return workflow
end

.failure?(state) ⇒ Boolean

Returns:

  • (Boolean)


436
437
438
# File 'lib/rworkflow/flow.rb', line 436

def failure?(state)
  return self::STATES_FAILED.include?(state)
end

.generate_id(workflow_name) ⇒ Object



373
374
375
376
377
# File 'lib/rworkflow/flow.rb', line 373

def generate_id(workflow_name)
  now = Time.now.to_f
  random = Random.new(now)
  return "#{name}__#{workflow_name}__#{(Time.now.to_f * 1000).to_i}__#{random.rand(now).to_i}"
end

.get_private_workflows(options = {}) ⇒ Object



389
390
391
# File 'lib/rworkflow/flow.rb', line 389

def get_private_workflows(options = {})
  return registry.private_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) }
end

.get_public_workflows(options = {}) ⇒ Object



385
386
387
# File 'lib/rworkflow/flow.rb', line 385

def get_public_workflows(options = {})
  return registry.public_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) }
end

.load(id, klass = nil) ⇒ Object



397
398
399
400
401
402
403
# File 'lib/rworkflow/flow.rb', line 397

def load(id, klass = nil)
  workflow = nil

  klass = read_flow_class(id) if klass.nil?
  workflow = klass.new(id) if klass.respond_to?(:new)
  return workflow
end

.read_flow_class(id) ⇒ Object



405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/rworkflow/flow.rb', line 405

def read_flow_class(id)
  klass = nil
  raw_class = id.split('__').first
  if !raw_class.nil?
    klass = begin
      raw_class.constantize
    rescue NameError => _
      Rails.logger.warn("Unknown flow class for workflow id #{id}")
      nil
    end
  end

  return klass
end

.register(workflow) ⇒ Object



424
425
426
# File 'lib/rworkflow/flow.rb', line 424

def register(workflow)
  registry.add(workflow)
end

.registered?(workflow) ⇒ Boolean

Returns:

  • (Boolean)


420
421
422
# File 'lib/rworkflow/flow.rb', line 420

def registered?(workflow)
  return registry.include?(workflow)
end

.registryObject



440
441
442
443
444
# File 'lib/rworkflow/flow.rb', line 440

def registry
  return @registry ||= begin
    FlowRegistry.new(Rworkflow::VERSION.to_s)
  end
end

.serializerObject



446
447
448
# File 'lib/rworkflow/flow.rb', line 446

def serializer
  YAML
end

.terminal?(state) ⇒ Boolean

Returns:

  • (Boolean)


432
433
434
# File 'lib/rworkflow/flow.rb', line 432

def terminal?(state)
  return self::STATES_TERMINAL.include?(state)
end

.unregister(workflow) ⇒ Object



428
429
430
# File 'lib/rworkflow/flow.rb', line 428

def unregister(workflow)
  registry.remove(workflow)
end

Instance Method Details

#cleaned_up?Boolean

Returns:

  • (Boolean)


204
205
206
# File 'lib/rworkflow/flow.rb', line 204

def cleaned_up?
  return states_list.all? { |name| !get_list(name).exists? }
end

#cleanupObject



296
297
298
299
300
301
302
303
304
# File 'lib/rworkflow/flow.rb', line 296

def cleanup
  return if Rails.env.test?
  states_cleanup
  logger.delete
  @processing.delete
  self.class.unregister(self)
  @flow_data.delete
  @storage.delete
end

#count(state) ⇒ Object



89
90
91
# File 'lib/rworkflow/flow.rb', line 89

def count(state)
  return get_list(state).size
end

#countersObject



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rworkflow/flow.rb', line 93

def counters
  the_counters = @storage.get(:counters)
  if !the_counters.nil?
    the_counters = begin
      self.class.serializer.load(the_counters)
    rescue => e
      Rails.logger.error("Error loading stored flow counters: #{e.message}")
      nil
    end
  end
  return the_counters || counters!
end

#created_atObject



57
58
59
# File 'lib/rworkflow/flow.rb', line 57

def created_at
  return @created_at ||= begin Time.zone.at(get(:created_at, 0)) end
end

#expected_durationObject



81
82
83
# File 'lib/rworkflow/flow.rb', line 81

def expected_duration
  return Float::INFINITY
end

#failed?Boolean

Returns:

  • (Boolean)


349
350
351
352
# File 'lib/rworkflow/flow.rb', line 349

def failed?
  return false if !finished?
  return total_objects_failed > 0
end

#fetch(fetcher_id, state_name) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/rworkflow/flow.rb', line 127

def fetch(fetcher_id, state_name)
  @processing.set(fetcher_id, 1)
  list = get_state_list(state_name)
  unless list.nil?
    failed = []
    cardinality = @lifecycle.states[state_name].cardinality
    cardinality = get(:start_count).to_i if cardinality == Lifecycle::CARDINALITY_ALL_STARTED
    force_list_complete = @lifecycle.states[state_name].policy == State::STATE_POLICY_WAIT
    raw_objects = list.lpop(cardinality, force_list_complete)
    unless raw_objects.empty?
      objects = raw_objects.map do |raw_object|
        begin
          self.class.serializer.load(raw_object)
        rescue StandardError => _
          failed << raw_object
          nil
        end
      end.compact
      @processing.set(fetcher_id, objects.size)

      unless failed.empty?
        push(failed, STATE_FAILED)
        Rails.logger.error("Failed to parse #{failed.size} in workflow #{@id} for fetcher id #{fetcher_id} at state #{state_name}")
      end

      yield(objects) if block_given?
    end
  end
ensure
  @processing.remove(fetcher_id)
  terminate if finished?
end

#finish_timeObject



77
78
79
# File 'lib/rworkflow/flow.rb', line 77

def finish_time
  return Time.zone.at(get(:finish_time, 0))
end

#finished?Boolean

Returns:

  • (Boolean)


41
42
43
44
45
46
47
48
# File 'lib/rworkflow/flow.rb', line 41

def finished?
  return false unless started?
  total = self.counters.reduce(0) do |sum, pair|
    self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i)
  end

  return total == 0
end

#get(key, default = nil) ⇒ Object



268
269
270
271
272
273
# File 'lib/rworkflow/flow.rb', line 268

def get(key, default = nil)
  value = @flow_data.get(key)
  value = value.nil? ? default : self.class.serializer.load(value)

  return value
end

#get_state_cardinality(state_name) ⇒ Object



258
259
260
261
262
# File 'lib/rworkflow/flow.rb', line 258

def get_state_cardinality(state_name)
  cardinality = @lifecycle.states[state_name].cardinality
  cardinality = self.get(:start_count).to_i if cardinality == Rworkflow::Lifecycle::CARDINALITY_ALL_STARTED
  return cardinality
end

#incr(key, value = 1) ⇒ Object



275
276
277
# File 'lib/rworkflow/flow.rb', line 275

def incr(key, value = 1)
  return @flow_data.incrby(key, value)
end

#list_objects(state_name, limit = -1)) ⇒ Object



160
161
162
163
# File 'lib/rworkflow/flow.rb', line 160

def list_objects(state_name, limit = -1)
  list = get_list(state_name)
  return list.get(0, limit).map { |object| self.class.serializer.load(object) }
end

#log(from_state, transition, num_objects) ⇒ Object



234
235
236
# File 'lib/rworkflow/flow.rb', line 234

def log(from_state, transition, num_objects)
  logger.incrby("#{from_state}__#{transition}", num_objects.to_i) if logging?
end

#loggerObject



238
239
240
241
242
# File 'lib/rworkflow/flow.rb', line 238

def logger
  return @logger ||= begin
    RedisRds::Hash.new("#{@redis_key}__logger")
  end
end

#logging?Boolean

Returns:

  • (Boolean)


230
231
232
# File 'lib/rworkflow/flow.rb', line 230

def logging?
  return get(:logging, false)
end

#logsObject



244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/rworkflow/flow.rb', line 244

def logs
  logs = {}
  if valid? && logging?
    state_transition_counters = logger.getall
    state_transition_counters.each do |state_transition, counter|
      state, transition = state_transition.split('__')
      logs[state] = {} unless logs.key?(state)
      logs[state][transition] = counter.to_i
    end
  end

  return logs
end

#metadata_stringObject



200
201
202
# File 'lib/rworkflow/flow.rb', line 200

def 
  return "Rworkflow: #{self.name}"
end

#nameObject



65
66
67
# File 'lib/rworkflow/flow.rb', line 65

def name
  return get(:name, @id)
end

#name=(name) ⇒ Object



69
70
71
# File 'lib/rworkflow/flow.rb', line 69

def name=(name)
  return set(:name, name)
end

#public?Boolean

Returns:

  • (Boolean)


354
355
356
# File 'lib/rworkflow/flow.rb', line 354

def public?
  return @public ||= begin get(:public, false) end
end

#set(key, value) ⇒ Object



264
265
266
# File 'lib/rworkflow/flow.rb', line 264

def set(key, value)
  @flow_data.set(key, self.class.serializer.dump(value))
end

#start(objects) ⇒ Object



312
313
314
315
316
317
318
# File 'lib/rworkflow/flow.rb', line 312

def start(objects)
  objects = Array.wrap(objects)
  set(:start_time, Time.now.to_i)
  set(:start_count, objects.size)
  push(objects, lifecycle.initial)
  log(lifecycle.initial, 'initial', objects.size)
end

#start_timeObject



73
74
75
# File 'lib/rworkflow/flow.rb', line 73

def start_time
  return Time.zone.at(get(:start_time, 0))
end

#started?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/rworkflow/flow.rb', line 61

def started?
  return !get(:start_time).nil?
end

#states_listObject



208
209
210
211
212
213
# File 'lib/rworkflow/flow.rb', line 208

def states_list
  states = self.class::STATES_TERMINAL
  states += @lifecycle.states.keys if valid?

  return states
end

#statusObject



50
51
52
53
54
55
# File 'lib/rworkflow/flow.rb', line 50

def status
  status = 'Running'
  status = successful? ? 'Finished' : 'Failed' if finished?

  return status
end

#successful?Boolean

Returns:

  • (Boolean)


344
345
346
347
# File 'lib/rworkflow/flow.rb', line 344

def successful?
  return false if !finished?
  return !failed?
end

#terminateObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rworkflow/flow.rb', line 178

def terminate
  mutex = RedisRds::Mutex.new(self.id)
  mutex.synchronize do
    if !self.cleaned_up?
      set(:finish_time, Time.now.to_i)
      post_process

      if self.public?
        the_counters = counters!
        the_counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done
        @storage.setnx(:counters, self.class.serializer.dump(the_counters))
        states_cleanup
      else
        self.cleanup
      end
    end
  end
end

#total_objects(counters = nil) ⇒ Object



330
331
332
# File 'lib/rworkflow/flow.rb', line 330

def total_objects(counters = nil)
  return (counters || self.counters).reduce(0) { |sum, pair| sum + pair[1] }
end

#total_objects_failed(counters = nil) ⇒ Object



334
335
336
337
338
339
340
341
342
# File 'lib/rworkflow/flow.rb', line 334

def total_objects_failed(counters = nil)
  return (counters || self.counters).reduce(0) do |sum, pair|
    if self.class.failure?(pair[0])
      sum + pair[1]
    else
      sum
    end
  end
end

#total_objects_processed(counters = nil) ⇒ Object



320
321
322
323
324
325
326
327
328
# File 'lib/rworkflow/flow.rb', line 320

def total_objects_processed(counters = nil)
  return (counters || self.counters).reduce(0) do |sum, pair|
    if self.class.terminal?(pair[0])
      sum + pair[1]
    else
      sum
    end
  end
end

#transition(from_state, name, objects) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/rworkflow/flow.rb', line 215

def transition(from_state, name, objects)
  objects = Array.wrap(objects)
  to_state = begin
    lifecycle.transition(from_state, name)
  rescue Rworkflow::StateError => e
    Rails.logger.error("Error transitioning: #{e}")
    nil
  end

  if !to_state.nil?
    push(objects, to_state)
    log(from_state, name, objects.size)
  end
end

#valid?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/rworkflow/flow.rb', line 85

def valid?
  return !@lifecycle.nil?
end