Class: Kestrel::Client::Transactional

Inherits:
Proxy
  • Object
show all
Defined in:
lib/kestrel/client/transactional.rb

Defined Under Namespace

Classes: MultipleQueueException, NoOpenTransaction, RetriesExceeded, RetryableJob

Constant Summary collapse

DEFAULT_RETRIES =

Number of times to retry a job before giving up

10
ERROR_PROCESSING_RATE =

Pct. of the time during ‘normal’ processing we check the error queue first

0.1

Instance Attribute Summary collapse

Attributes inherited from Proxy

#client

Instance Method Summary collapse

Methods inherited from Proxy

#method_missing

Constructor Details

#initialize(client, max_retries = nil, error_rate = nil) ⇒ Transactional

Parameters

client<Kestrel::Client>

Client

max_retries<Integer>

Number of times to retry a job before giving up. Defaults to DEFAULT_RETRIES

error_rate<Float>

Pct. of the time during ‘normal’ processing we check the error queue first. Defaults to ERROR_PROCESSING_RATE

per_server<Integer>

Number of gets to execute against a single server, before changing servers. Defaults to MAX_PER_SERVER



37
38
39
40
41
42
# File 'lib/kestrel/client/transactional.rb', line 37

def initialize(client, max_retries = nil, error_rate = nil)
  @max_retries = max_retries || DEFAULT_RETRIES
  @error_rate  = error_rate || ERROR_PROCESSING_RATE
  @counter     = 0 # Command counter
  super(client)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Kestrel::Client::Proxy

Instance Attribute Details

#current_queueObject (readonly)

Returns the value of attribute current_queue.



44
45
46
# File 'lib/kestrel/client/transactional.rb', line 44

def current_queue
  @current_queue
end

Instance Method Details

#close_last_transactionObject

:nodoc:



81
82
83
84
85
86
# File 'lib/kestrel/client/transactional.rb', line 81

def close_last_transaction #:nodoc:
  return unless @last_read_queue

  client.get_from_last(@last_read_queue, :close => true)
  @last_read_queue = @current_queue = @job = nil
end

#current_tryObject



77
78
79
# File 'lib/kestrel/client/transactional.rb', line 77

def current_try
  @job.retries + 1
end

#get(key, opts = {}) ⇒ Object

Returns job from the key queue 1 - ERROR_PROCESSING_RATE pct. of the time. Every so often, checks the error queue for jobs and returns a retryable job.

Returns

Job, possibly retryable, or nil

Raises

MultipleQueueException



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/kestrel/client/transactional.rb', line 56

def get(key, opts = {})
  raise MultipleQueueException if current_queue && key != current_queue

  close_last_transaction

  if read_from_error_queue?
    queue = key + "_errors"
    job = client.get_from_last(queue, opts.merge(:open => true))
  else
    queue = key
    job = client.get(queue, opts.merge(:open => true))
  end

  if job
    @job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job)
    @last_read_queue = queue
    @current_queue = key
    @job.job
  end
end

#retry(item = nil) ⇒ Object

Enqueues the current job on the error queue for later retry. If the job has been retried DEFAULT_RETRIES times, gives up entirely.

Parameters

item (optional)

if specified, the job set to the error queue with the given payload instead of what was originally fetched.

Returns

Boolean

true if the job is enqueued in the retry queue, false otherwise

Raises

NoOpenTransaction



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/kestrel/client/transactional.rb', line 103

def retry(item = nil)
  raise NoOpenTransaction unless @last_read_queue

  job = item ? RetryableJob.new(@job.retries, item) : @job.dup

  job.retries += 1

  if job.retries < @max_retries
    client.set(current_queue + "_errors", job)
  else
    raise RetriesExceeded.new(job)
  end

ensure
  close_last_transaction
end