Class: Falqon::Strategies::Linear

Inherits:
Falqon::Strategy
  • Object
show all
Defined in:
lib/falqon/strategies/linear.rb

Overview

Retry strategy that retries a fixed number of times

When a message fails to process, it is moved to the scheduled queue, and retried after a fixed delay (configured by Queue#retry_delay). If a messages fails to process after the maximum number of retries (configured by Queue#max_retries), it is marked as dead, and moved to the dead subqueue.

When using the linear strategy and the retry delay is set to a non-zero value, a scheduled needs to be started to retry the messages after the configured delay.

queue = Falqon::Queue.new("my_queue")

# Start the watcher in a separate thread
Thread.new { loop { queue.schedule; sleep 1 } }

# Or start the watcher in a separate fiber
Fiber
  .new { loop { queue.schedule; sleep 1 } }
  .resume

Examples:

queue = Falqon::Queue.new("my_queue", retry_strategy: :linear, retry_delay: 60, max_retries: 3)
queue.push("Hello, World!")
queue.pop { raise Falqon::Error }
queue.inspect # => #<Falqon::Queue name="my_queue" pending=0 processing=0 scheduled=1 dead=0>
sleep 60
queue.pop # => "Hello, World!"

Instance Method Summary collapse

Instance Method Details

#retry(message, error) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/falqon/strategies/linear.rb', line 36

def retry(message, error)
  queue.redis.with do |r|
    # Increment retry count
    retries = r.hincrby("#{queue.id}:metadata:#{message.id}", :retries, 1)

    r.multi do |t|
      # Set error metadata
      t.hset(
        "#{queue.id}:metadata:#{message.id}",
        :retried_at, Time.now.to_i,
        :retry_error, error.message,
      )

      if retries < queue.max_retries || queue.max_retries == -1
        if queue.retry_delay.positive?
          queue.logger.debug "Scheduling message #{message.id} on queue #{queue.name} in #{queue.retry_delay} seconds (attempt #{retries})"

          # Add identifier to scheduled queue
          queue.scheduled.add(message.id, Time.now.to_i + queue.retry_delay)

          # Set message status
          t.hset("#{queue.id}:metadata:#{message.id}", :status, "scheduled")
        else
          queue.logger.debug "Requeuing message #{message.id} on queue #{queue.name} (attempt #{retries})"

          # Add identifier back to pending queue
          queue.pending.add(message.id)

          # Set message status
          t.hset("#{queue.id}:metadata:#{message.id}", :status, "pending")
        end
      else
        # Kill message after max retries
        message.kill

        # Set message status
        t.hset("#{queue.id}:metadata:#{message.id}", :status, "dead")
      end

      # Remove identifier from processing queue
      queue.processing.remove(message.id)
    end
  end
end