Class: SidekiqWorkflows::Worker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
lib/sidekiq_workflows/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.perform_async(workflow, *args) ⇒ Object



55
56
57
# File 'lib/sidekiq_workflows/worker.rb', line 55

def self.perform_async(workflow, *args)
  set(queue: worker_queue).send(:perform_async, workflow.serialize, *args)
end

.perform_workflow(workflow, on_success: nil, on_success_options: {}, on_death: nil, on_death_options: {}) {|batch| ... } ⇒ Object

Yields:

  • (batch)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/sidekiq_workflows/worker.rb', line 59

def self.perform_workflow(workflow, on_success: nil, on_success_options: {}, on_death: nil, on_death_options: {})
  batch = Sidekiq::Batch.new
  batch.callback_queue = SidekiqWorkflows.callback_queue unless SidekiqWorkflows.callback_queue.nil?
  batch.description = "Workflow #{workflow.workflow_uuid || '-'} root batch"
  batch.on(:success, on_success, on_success_options.merge(workflow_uuid: workflow.workflow_uuid)) if on_success
  batch.on(:death, on_death, on_death_options.merge(workflow_uuid: workflow.workflow_uuid)) if on_death

  yield batch if block_given?

  batch.jobs do
    perform_async(workflow)
  end

  batch.bid
end

Instance Method Details

#on_death(status, options) ⇒ Object



46
47
48
49
50
51
52
53
# File 'lib/sidekiq_workflows/worker.rb', line 46

def on_death(status, options)
  workflow = ensure_deserialized(options['workflow'])

  if workflow.on_partial_complete
    klass, method = workflow.on_partial_complete.split('#')
    ActiveSupport::Inflector.constantize(klass).new.send(method, status, options)
  end
end

#on_success(status, options) ⇒ Object



35
36
37
38
39
40
41
42
43
44
# File 'lib/sidekiq_workflows/worker.rb', line 35

def on_success(status, options)
  workflow = ensure_deserialized(options['workflow'])

  if workflow.on_partial_complete
    klass, method = workflow.on_partial_complete.split('#')
    ActiveSupport::Inflector.constantize(klass).new.send(method, status, options)
  end

  perform_children(status.parent_batch, workflow)
end

#perform(workflow) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/sidekiq_workflows/worker.rb', line 9

def perform(workflow)
  workflow = ensure_deserialized(workflow)

  case workflow.class.name
  when 'SidekiqWorkflows::RootNode'
    perform_children(batch, workflow)
  when 'SidekiqWorkflows::WorkerNode'
    batch.jobs do
      child_batch = Sidekiq::Batch.new
      child_batch.callback_queue = SidekiqWorkflows.callback_queue unless SidekiqWorkflows.callback_queue.nil?
      child_batch.description = "Workflow #{workflow.workflow_uuid || '-'}"
      child_batch.on(:success, 'SidekiqWorkflows::Worker#on_success', workflow: workflow.serialize, workflow_uuid: workflow.workflow_uuid)
      child_batch.on(:death, 'SidekiqWorkflows::Worker#on_death', workflow: workflow.serialize, workflow_uuid: workflow.workflow_uuid)
      child_batch.jobs do
        workflow.workers.each do |entry|
          if entry[:delay]
            entry[:worker].perform_in(entry[:delay], *entry[:payload])
          else
            entry[:worker].perform_async(*entry[:payload])
          end
        end
      end
    end
  end
end