Class: SimpleQueues::Redis
- Inherits:
-
Object
- Object
- SimpleQueues::Redis
- Defined in:
- lib/simple_queues/redis.rb
Overview
The Redis version of SimpleQueues.
Messages are enqueued to the right, dequeued from the left - thus the most recent messages are at the end of the list.
Instance Attribute Summary collapse
-
#encoder ⇒ Object
Returns the value of attribute encoder.
Instance Method Summary collapse
- #clear(queue_name) ⇒ Object
- #decode(message) ⇒ Object
-
#dequeue_blocking(queue_name) ⇒ String
Dequeues a message, and waits forever for one to arrive.
-
#dequeue_with_timeout(*args) ⇒ String?
Dequeues a message, or returns
nil
if the timeout is exceeded. - #encode(message) ⇒ Object
-
#enqueue(queue_name, message) ⇒ Object
Enqueues a new message to the Redis backend.
-
#initialize(redis = ::Redis.new, options = {}) ⇒ Redis
constructor
A new instance of Redis.
-
#on_dequeue(queue_name, &block) ⇒ Object
Saves a block for later execution from #dequeue_with_timeout or #dequeue.
- #size(queue_name) ⇒ Object
Constructor Details
#initialize(redis = ::Redis.new, options = {}) ⇒ Redis
Returns a new instance of Redis.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/simple_queues/redis.rb', line 15 def initialize(redis = ::Redis.new, ={}) @encoder = case [:encoder] when :messagepack, :msgpack SimpleQueues::MessagePackEncoder.new when :json, nil SimpleQueues::JsonEncoder.new when :identity SimpleQueues::IdentityEncoder.new else # Use whatever was provided [:encoder] end raise ArgumentError, "Provided encoder #{@encoder.inspect} does not handle #encode and #decode" unless @encoder.respond_to?(:encode) && @encoder.respond_to?(:decode) @redis = redis @queues = Hash.new end |
Instance Attribute Details
#encoder ⇒ Object
Returns the value of attribute encoder.
9 10 11 |
# File 'lib/simple_queues/redis.rb', line 9 def encoder @encoder end |
Instance Method Details
#clear(queue_name) ⇒ Object
74 75 76 |
# File 'lib/simple_queues/redis.rb', line 74 def clear(queue_name) @redis.ltrim(q_name(queue_name), 1, 0) end |
#decode(message) ⇒ Object
47 48 49 |
# File 'lib/simple_queues/redis.rb', line 47 def decode() encoder.decode() if end |
#dequeue_blocking(queue_name) ⇒ String
Dequeues a message, and waits forever for one to arrive.
70 71 72 |
# File 'lib/simple_queues/redis.rb', line 70 def dequeue_blocking(queue_name) dequeue_with_timeout(queue_name, 0) end |
#dequeue_with_timeout(*args) ⇒ String?
Dequeues a message, or returns nil
if the timeout is exceeded.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/simple_queues/redis.rb', line 88 def dequeue_with_timeout(*args) case args.length when 1 # Timeout only timeout = args.shift raise ArgumentError, "Timeout must not be nil" if timeout.nil? || timeout.to_s.empty? queue, result = @redis.blpop(*[@queues.keys, timeout.to_i].flatten) if queue then block = @queues.fetch(queue) = decode(result) if block.arity == 1 then block.call() else block.call(queue, ) end end queue when 2 queue_name, timeout = args.shift, args.shift _, result = @redis.blpop(q_name(queue_name), timeout.to_i) decode(result) else raise ArgumentError, "Expected 1 (timeout) or 2 (queue name, timeout) arguments, not #{args.length}" end end |
#encode(message) ⇒ Object
42 43 44 45 |
# File 'lib/simple_queues/redis.rb', line 42 def encode() raise ArgumentError, "message must be non-nil" if .nil? encoder.encode() end |
#enqueue(queue_name, message) ⇒ Object
Enqueues a new message to the Redis backend.
57 58 59 60 61 62 63 |
# File 'lib/simple_queues/redis.rb', line 57 def enqueue(queue_name, ) raise ArgumentError, "Only hashes are accepted as messages" unless .is_a?(Hash) msg = encode() @redis.rpush(q_name(queue_name), msg) msg end |
#on_dequeue(queue_name, &block) ⇒ Object
Saves a block for later execution from #dequeue_with_timeout or #dequeue.
When the block’s arity is 1, only the message will be passed. When the block’s arity is 2, the queue’s name and the message will be passed along, in that order. When the block’s arity is negative (accepts a variable number of arguments), SimpleQueues::Redis behaves as if the block’s arity was 2.
37 38 39 40 |
# File 'lib/simple_queues/redis.rb', line 37 def on_dequeue(queue_name, &block) raise ArgumentError, "The provided block must accept at least one argument - #{block.inspect} accepts no arguments" if block.arity.zero? @queues[q_name(queue_name)] = block end |
#size(queue_name) ⇒ Object
78 79 80 |
# File 'lib/simple_queues/redis.rb', line 78 def size(queue_name) @redis.llen(q_name(queue_name)) end |