Class: BackRun::Worker
- Inherits:
-
Object
- Object
- BackRun::Worker
- Defined in:
- lib/back_run/worker.rb
Instance Attribute Summary collapse
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
Instance Method Summary collapse
-
#initialize(pubsub, queues, threads, pool = nil) ⇒ Worker
constructor
A new instance of Worker.
- #kill_job(job) ⇒ Object
- #message_received(job, ack, modify_ack_deadline) ⇒ Object
- #retry_job(job) ⇒ Object
- #start_listening! ⇒ Object
Constructor Details
#initialize(pubsub, queues, threads, pool = nil) ⇒ Worker
Returns a new instance of Worker.
9 10 11 12 13 14 15 |
# File 'lib/back_run/worker.rb', line 9 def initialize(pubsub, queues, threads, pool = nil) @queues = queues @pubsub = pubsub @pool = pool || Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: threads, max_queue: 1 ) end |
Instance Attribute Details
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
7 8 9 |
# File 'lib/back_run/worker.rb', line 7 def queues @queues end |
Instance Method Details
#kill_job(job) ⇒ Object
39 40 41 |
# File 'lib/back_run/worker.rb', line 39 def kill_job(job) @pubsub.kill_job(job) end |
#message_received(job, ack, modify_ack_deadline) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/back_run/worker.rb', line 22 def (job, ack, modify_ack_deadline) if job.should_run_now? ack.call run_job(job) else remaining_seconds = job.remaining_seconds_to_run modify_ack_deadline.call(remaining_seconds) end rescue Concurrent::RejectedExecutionError BackRun.logger.info('Thread pool busy. Republishing the job') retry_job(job) end |
#retry_job(job) ⇒ Object
35 36 37 |
# File 'lib/back_run/worker.rb', line 35 def retry_job(job) @pubsub.publish(job) end |
#start_listening! ⇒ Object
17 18 19 20 |
# File 'lib/back_run/worker.rb', line 17 def start_listening! @pubsub.subscribe(self) sleep end |