Class: Belated::Client
- Inherits:
-
Object
- Object
- Belated::Client
- Includes:
- Singleton
- Defined in:
- lib/belated/client.rb,
lib/belated/testing.rb
Overview
A client that can perform jobs inline
Instance Attribute Summary collapse
-
#bank ⇒ Object
Returns the value of attribute bank.
-
#banker_thread ⇒ Object
Returns the value of attribute banker_thread.
-
#proc_table ⇒ Object
Returns the value of attribute proc_table.
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
- #delete_from_table ⇒ Object
-
#old_perform ⇒ JobWrapper
The method that pushes the jobs to the queue.
-
#perform(job, at: nil, max_retries: 5, active_job: false) ⇒ JobWrapper
The method that pushes the jobs to the queue.
-
#perform_belated ⇒ JobWrapper
The method that pushes the jobs to the queue.
-
#perform_later ⇒ JobWrapper
The method that pushes the jobs to the queue.
-
#start ⇒ void
(also: #initialize)
Starts up the client.
-
#start_banker_thread ⇒ void
Thread in charge of handling the bank queue.
- #started? ⇒ Boolean
-
#turn_off ⇒ Object
Makes it possible to reset the client.
Instance Attribute Details
#bank ⇒ Object
Returns the value of attribute bank.
14 15 16 |
# File 'lib/belated/client.rb', line 14 def bank @bank end |
#banker_thread ⇒ Object
Returns the value of attribute banker_thread.
14 15 16 |
# File 'lib/belated/client.rb', line 14 def banker_thread @banker_thread end |
#proc_table ⇒ Object
Returns the value of attribute proc_table.
14 15 16 |
# File 'lib/belated/client.rb', line 14 def proc_table @proc_table end |
#queue ⇒ Object
Returns the value of attribute queue.
14 15 16 |
# File 'lib/belated/client.rb', line 14 def queue @queue end |
Instance Method Details
#delete_from_table ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/belated/client.rb', line 61 def delete_from_table return if proc_table.length < 25 @mutex.synchronize do proc_table.select { |_k, v| v.completed }.each do |key, _value| proc_table.delete(key) end end end |
#old_perform ⇒ JobWrapper
The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/belated/testing.rb', line 24 def perform(job, at: nil, max_retries: 5, active_job: false) start unless started? return unless proper_job?(job) job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job) bank.push(job_wrapper) @mutex.synchronize do proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass end self.banker_thread = start_banker_thread if banker_thread.nil? job_wrapper end |
#perform(job, at: nil, max_retries: 5, active_job: false) ⇒ JobWrapper
The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/belated/client.rb', line 77 def perform(job, at: nil, max_retries: 5, active_job: false) start unless started? return unless proper_job?(job) job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job) bank.push(job_wrapper) @mutex.synchronize do proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass end self.banker_thread = start_banker_thread if banker_thread.nil? job_wrapper end |
#perform_belated ⇒ JobWrapper
The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/belated/client.rb', line 89 def perform(job, at: nil, max_retries: 5, active_job: false) start unless started? return unless proper_job?(job) job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job) bank.push(job_wrapper) @mutex.synchronize do proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass end self.banker_thread = start_banker_thread if banker_thread.nil? job_wrapper end |
#perform_later ⇒ JobWrapper
The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/belated/client.rb', line 90 def perform(job, at: nil, max_retries: 5, active_job: false) start unless started? return unless proper_job?(job) job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job) bank.push(job_wrapper) @mutex.synchronize do proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass end self.banker_thread = start_banker_thread if banker_thread.nil? job_wrapper end |
#start ⇒ void Also known as: initialize
This method returns an undefined value.
Starts up the client. Connects to the queue through DRb.
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/belated/client.rb', line 19 def start return if started? server_uri = Belated::URI DRb.start_service self.proc_table = {} self.bank = Thread::Queue.new self.queue = DRbObject.new_with_uri(server_uri) @started = true @mutex = Mutex.new end |
#start_banker_thread ⇒ void
This method returns an undefined value.
Thread in charge of handling the bank queue. You probably want to memoize the client in order to avoid having many threads in the sleep state.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/belated/client.rb', line 46 def start_banker_thread Thread.new do loop do delete_from_table sleep Belated.client_heartbeat and next if bank.empty? bank.length.times do queue.push(wrapper = bank.pop) rescue DRb::DRbConnError bank.push(wrapper) end end end end |
#started? ⇒ Boolean
32 33 34 |
# File 'lib/belated/client.rb', line 32 def started? @started end |
#turn_off ⇒ Object
Makes it possible to reset the client
37 38 39 40 |
# File 'lib/belated/client.rb', line 37 def turn_off @started = false banker_thread&.kill end |