Class: Workhorse::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/poller.rb

Constant Summary collapse

MIN_LOCK_TIMEOUT =

In seconds

0.1
MAX_LOCK_TIMEOUT =

In seconds

1.0
ORACLE_LOCK_MODE =

X_MODE (exclusive)

6
ORACLE_LOCK_HANDLE =

Randomly chosen number

478_564_848

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, before_poll = proc { true }) ⇒ Poller

Returns a new instance of Poller.



12
13
14
15
16
17
18
19
20
21
# File 'lib/workhorse/poller.rb', line 12

def initialize(worker, before_poll = proc { true })
  @worker = worker
  @running = false
  @table = Workhorse::DbJob.arel_table
  @is_oracle = ActiveRecord::Base.connection.adapter_name == 'OracleEnhanced'
  @instant_repoll = Concurrent::AtomicBoolean.new(false)
  @global_lock_fails = 0
  @max_global_lock_fails_reached = false
  @before_poll = before_poll
end

Instance Attribute Details

#tableObject (readonly)

Returns the value of attribute table.



10
11
12
# File 'lib/workhorse/poller.rb', line 10

def table
  @table
end

#workerObject (readonly)

Returns the value of attribute worker.



9
10
11
# File 'lib/workhorse/poller.rb', line 9

def worker
  @worker
end

Instance Method Details

#instant_repoll!Object

Call this to interrupt current sleep and perform the next poll as soon as possible, then resume in the normal polling interval.



70
71
72
73
# File 'lib/workhorse/poller.rb', line 70

def instant_repoll!
  worker.log 'Aborting next sleep to perform instant repoll', :debug
  @instant_repoll.make_true
end

#running?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/workhorse/poller.rb', line 23

def running?
  @running
end

#shutdownObject



58
59
60
61
62
# File 'lib/workhorse/poller.rb', line 58

def shutdown
  fail 'Poller is not running.' unless running?
  @running = false
  wait
end

#startObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/workhorse/poller.rb', line 27

def start
  fail 'Poller is already running.' if running?
  @running = true

  clean_stuck_jobs! if Workhorse.clean_stuck_jobs

  @thread = Thread.new do
    loop do
      break unless running?

      begin
        unless @before_poll.call
          Thread.new { worker.shutdown }
          sleep
          next
        end

        poll
        sleep
      rescue Exception => e
        worker.log %(Poll encountered exception:\n#{e.message}\n#{e.backtrace.join("\n")})
        worker.log 'Worker shutting down...'
        Workhorse.on_exception.call(e) unless Workhorse.silence_poller_exceptions
        @running = false
        worker.instance_variable_get(:@pool).shutdown
        break
      end
    end
  end
end

#waitObject



64
65
66
# File 'lib/workhorse/poller.rb', line 64

def wait
  @thread.join
end