Class: Falqon::Strategies::Linear
- Inherits:
-
Falqon::Strategy
- Object
- Falqon::Strategy
- Falqon::Strategies::Linear
- Defined in:
- lib/falqon/strategies/linear.rb
Overview
Retry strategy that retries a fixed number of times
When a message fails to process, it is moved to the scheduled queue, and retried after a fixed delay (configured by Queue#retry_delay). If a messages fails to process after the maximum number of retries (configured by Queue#max_retries), it is marked as dead, and moved to the dead subqueue.
When using the linear strategy and the retry delay is set to a non-zero value, a scheduled needs to be started to retry the messages after the configured delay.
queue = Falqon::Queue.new("my_queue")
# Start the watcher in a separate thread
Thread.new { loop { queue.schedule; sleep 1 } }
# Or start the watcher in a separate fiber
Fiber
.new { loop { queue.schedule; sleep 1 } }
.resume
Instance Method Summary collapse
Instance Method Details
#retry(message, error) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/falqon/strategies/linear.rb', line 36 def retry(, error) queue.redis.with do |r| # Increment retry count retries = r.hincrby("#{queue.id}:metadata:#{.id}", :retries, 1) r.multi do |t| # Set error metadata t.hset( "#{queue.id}:metadata:#{.id}", :retried_at, Time.now.to_i, :retry_error, error., ) if retries < queue.max_retries || queue.max_retries == -1 if queue.retry_delay.positive? queue.logger.debug "Scheduling message #{.id} on queue #{queue.name} in #{queue.retry_delay} seconds (attempt #{retries})" # Add identifier to scheduled queue queue.scheduled.add(.id, Time.now.to_i + queue.retry_delay) # Set message status t.hset("#{queue.id}:metadata:#{.id}", :status, "scheduled") else queue.logger.debug "Requeuing message #{.id} on queue #{queue.name} (attempt #{retries})" # Add identifier back to pending queue queue.pending.add(.id) # Set message status t.hset("#{queue.id}:metadata:#{.id}", :status, "pending") end else # Kill message after max retries .kill # Set message status t.hset("#{queue.id}:metadata:#{.id}", :status, "dead") end # Remove identifier from processing queue queue.processing.remove(.id) end end end |