Class: Kestrel::Client::Transactional
- Defined in:
- lib/kestrel/client/transactional.rb
Defined Under Namespace
Classes: MultipleQueueException, NoOpenTransaction, RetriesExceeded, RetryableJob
Constant Summary collapse
- DEFAULT_RETRIES =
Number of times to retry a job before giving up
10
- ERROR_PROCESSING_RATE =
Pct. of the time during ‘normal’ processing we check the error queue first
0.1
Instance Attribute Summary collapse
-
#current_queue ⇒ Object
readonly
Returns the value of attribute current_queue.
Attributes inherited from Proxy
Instance Method Summary collapse
-
#close_last_transaction ⇒ Object
:nodoc:.
- #current_try ⇒ Object
-
#get(key, opts = {}) ⇒ Object
Returns job from the
key
queue 1 - ERROR_PROCESSING_RATE pct. -
#initialize(client, max_retries = nil, error_rate = nil) ⇒ Transactional
constructor
Parameters client<Kestrel::Client>:: Client max_retries<Integer>:: Number of times to retry a job before giving up.
-
#retry(item = nil) ⇒ Object
Enqueues the current job on the error queue for later retry.
Methods inherited from Proxy
Constructor Details
#initialize(client, max_retries = nil, error_rate = nil) ⇒ Transactional
Parameters
- client<Kestrel::Client>
-
Client
- max_retries<Integer>
-
Number of times to retry a job before giving up. Defaults to DEFAULT_RETRIES
- error_rate<Float>
-
Pct. of the time during ‘normal’ processing we check the error queue first. Defaults to ERROR_PROCESSING_RATE
- per_server<Integer>
-
Number of gets to execute against a single server, before changing servers. Defaults to MAX_PER_SERVER
37 38 39 40 41 42 |
# File 'lib/kestrel/client/transactional.rb', line 37 def initialize(client, max_retries = nil, error_rate = nil) @max_retries = max_retries || DEFAULT_RETRIES @error_rate = error_rate || ERROR_PROCESSING_RATE @counter = 0 # Command counter super(client) end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class Kestrel::Client::Proxy
Instance Attribute Details
#current_queue ⇒ Object (readonly)
Returns the value of attribute current_queue.
44 45 46 |
# File 'lib/kestrel/client/transactional.rb', line 44 def current_queue @current_queue end |
Instance Method Details
#close_last_transaction ⇒ Object
:nodoc:
81 82 83 84 85 86 |
# File 'lib/kestrel/client/transactional.rb', line 81 def close_last_transaction #:nodoc: return unless @last_read_queue client.get_from_last(@last_read_queue, :close => true) @last_read_queue = @current_queue = @job = nil end |
#current_try ⇒ Object
77 78 79 |
# File 'lib/kestrel/client/transactional.rb', line 77 def current_try @job.retries + 1 end |
#get(key, opts = {}) ⇒ Object
Returns job from the key
queue 1 - ERROR_PROCESSING_RATE pct. of the time. Every so often, checks the error queue for jobs and returns a retryable job.
Returns
Job, possibly retryable, or nil
Raises
MultipleQueueException
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/kestrel/client/transactional.rb', line 56 def get(key, opts = {}) raise MultipleQueueException if current_queue && key != current_queue close_last_transaction if read_from_error_queue? queue = key + "_errors" job = client.get_from_last(queue, opts.merge(:open => true)) else queue = key job = client.get(queue, opts.merge(:open => true)) end if job @job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job) @last_read_queue = queue @current_queue = key @job.job end end |
#retry(item = nil) ⇒ Object
Enqueues the current job on the error queue for later retry. If the job has been retried DEFAULT_RETRIES times, gives up entirely.
Parameters
- item (optional)
-
if specified, the job set to the error queue with the given payload instead of what was originally fetched.
Returns
- Boolean
-
true if the job is enqueued in the retry queue, false otherwise
Raises
NoOpenTransaction
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/kestrel/client/transactional.rb', line 103 def retry(item = nil) raise NoOpenTransaction unless @last_read_queue job = item ? RetryableJob.new(@job.retries, item) : @job.dup job.retries += 1 if job.retries < @max_retries client.set(current_queue + "_errors", job) else raise RetriesExceeded.new(job) end ensure close_last_transaction end |