Class: Rworkflow::Worker

Inherits:
Object
  • Object
show all
Includes:
SidekiqHelper, Sidekiq::Worker
Defined in:
lib/rworkflow/minitest/worker.rb,
lib/rworkflow/worker.rb

Overview

Disable pushing back indefinitely

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SidekiqHelper

configure_client, configure_server, included, queue_sizes

Constructor Details

#initialize(*args) ⇒ Worker

Returns a new instance of Worker.



4
5
6
7
# File 'lib/rworkflow/minitest/worker.rb', line 4

def initialize(*args)
  super
  @__pushed_back = []
end

Class Method Details

.generate_lifecycle(&block) ⇒ Object



44
45
46
47
48
49
# File 'lib/rworkflow/worker.rb', line 44

def generate_lifecycle(&block)
  return Rworkflow::Lifecycle.new do |lc|
    lc.state(self.class.name, worker: self.class, &block)
    lc.initial = self.class.name
  end
end

.load_workflow(id) ⇒ Object



51
52
53
54
55
56
57
# File 'lib/rworkflow/worker.rb', line 51

def load_workflow(id)
  workflow = Flow.load(id)
  return workflow if !workflow.nil? && workflow.valid?

  Rails.logger.warn("Worker #{self.name} tried to load non existent workflow #{id}")
  return nil
end

Instance Method Details

#perform(id, state_name) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/rworkflow/worker.rb', line 8

def perform(id, state_name)
  @workflow = self.class.load_workflow(id)
  @state_name = state_name
  if !@workflow.nil?
    if !@workflow.paused?
      @workflow.fetch(self.jid, state_name) do |objects|
        if objects.present?
          Rails.logger.debug("Starting #{self.class}::process() (flow #{id})")
          process(objects)
          Rails.logger.debug("Finished #{self.class}::process() (flow #{id})")
        else
          Rails.logger.debug("No objects to process for #{self.class}")
        end
      end
    end
  end
rescue Exception => e
  Rails.logger.error("Exception produced on #{@state_name} for flow #{id} on perform: #{e.message}\n#{e.backtrace}")
  raise e
end

#process(_objects) ⇒ Object

Raises:

  • (NotImplementedError)


39
40
41
# File 'lib/rworkflow/worker.rb', line 39

def process(_objects)
  raise NotImplementedError
end

#push_back(objects) ⇒ Object



34
35
36
37
# File 'lib/rworkflow/worker.rb', line 34

def push_back(objects)
  @workflow.push(objects, @state_name)
  Rails.logger.debug("State #{@state_name} pushed back #{Array.wrap(objects).size} objects (flow #{@workflow.id})")
end

#pushed_backObject



9
10
11
# File 'lib/rworkflow/minitest/worker.rb', line 9

def pushed_back
  return @__pushed_back
end

#transition(to_state, objects) ⇒ Object



29
30
31
32
# File 'lib/rworkflow/worker.rb', line 29

def transition(to_state, objects)
  @workflow.transition(@state_name, to_state, objects)
  Rails.logger.debug("State #{@state_name} transitioned #{Array.wrap(objects).size} objects to state #{to_state} (flow #{@workflow.id})")
end