Class: Sequel::Worker
Overview
A Worker is a thread that accepts jobs off a work queue and processes them in the background. It accepts an optional database where it wruns all of its work inside a transaction.
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#async(proc = nil, &block) ⇒ Object
(also: #add, #<<)
Add a job to the queue, specified either as a proc argument or as a block.
-
#busy? ⇒ Boolean
Whether the worker is actively working and/or has work scheduled.
-
#initialize(db = nil) ⇒ Worker
constructor
Setup the interal variables.
-
#join ⇒ Object
Wait until the worker is no longer busy and then stop working.
-
#work ⇒ Object
Continually get jobs from the work queue and process them.
Constructor Details
#initialize(db = nil) ⇒ Worker
Setup the interal variables. If a database is given, run the thread inside a database transaction. Continue to work until #join is called.
12 13 14 15 16 17 18 19 |
# File 'lib/sequel_core/worker.rb', line 12 def initialize(db = nil) @queue = Queue.new @errors = [] t = self t.abort_on_exception = true @transaction = !db.nil? db ? super {db.transaction {t.work}} : super {t.work} end |
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
7 8 9 |
# File 'lib/sequel_core/worker.rb', line 7 def errors @errors end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
6 7 8 |
# File 'lib/sequel_core/worker.rb', line 6 def queue @queue end |
Instance Method Details
#async(proc = nil, &block) ⇒ Object Also known as: add, <<
Add a job to the queue, specified either as a proc argument or as a block.
23 24 25 26 |
# File 'lib/sequel_core/worker.rb', line 23 def async(proc = nil, &block) @queue << (proc || block) self end |
#busy? ⇒ Boolean
Whether the worker is actively working and/or has work scheduled
31 32 33 |
# File 'lib/sequel_core/worker.rb', line 31 def busy? @cur || !@queue.empty? end |
#join ⇒ Object
Wait until the worker is no longer busy and then stop working.
36 37 38 39 40 |
# File 'lib/sequel_core/worker.rb', line 36 def join sleep(0.1) while busy? self.raise Error::WorkerStop super end |
#work ⇒ Object
Continually get jobs from the work queue and process them.
43 44 45 46 47 48 49 50 |
# File 'lib/sequel_core/worker.rb', line 43 def work begin loop {next_job} rescue Sequel::Error::WorkerStop # signals the worker thread to stop ensure raise Sequel::Error::Rollback if @transaction && !@errors.empty? end end |