Class: BackRun::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/back_run/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pubsub, queues, threads, pool = nil) ⇒ Worker

Returns a new instance of Worker.



9
10
11
12
13
14
15
# File 'lib/back_run/worker.rb', line 9

def initialize(pubsub, queues, threads, pool = nil)
  @queues = queues
  @pubsub = pubsub
  @pool = pool || Concurrent::ThreadPoolExecutor.new(
    min_threads: 1, max_threads: threads, max_queue: 1
  )
end

Instance Attribute Details

#queuesObject (readonly)

Returns the value of attribute queues.



7
8
9
# File 'lib/back_run/worker.rb', line 7

def queues
  @queues
end

Instance Method Details

#kill_job(job) ⇒ Object



39
40
41
# File 'lib/back_run/worker.rb', line 39

def kill_job(job)
  @pubsub.kill_job(job)
end

#message_received(job, ack, modify_ack_deadline) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/back_run/worker.rb', line 22

def message_received(job, ack, modify_ack_deadline)
  if job.should_run_now?
    ack.call
    run_job(job)
  else
    remaining_seconds = job.remaining_seconds_to_run
    modify_ack_deadline.call(remaining_seconds)
  end
rescue Concurrent::RejectedExecutionError
  BackRun.logger.info('Thread pool busy. Republishing the job')
  retry_job(job)
end

#retry_job(job) ⇒ Object



35
36
37
# File 'lib/back_run/worker.rb', line 35

def retry_job(job)
  @pubsub.publish(job)
end

#start_listening!Object



17
18
19
20
# File 'lib/back_run/worker.rb', line 17

def start_listening!
  @pubsub.subscribe(self)
  sleep
end