Class: MobME::Infrastructure::Queue::Backends::Redis
- Inherits:
-
MobME::Infrastructure::Queue::Backend
- Object
- MobME::Infrastructure::Queue::Backend
- MobME::Infrastructure::Queue::Backends::Redis
- Defined in:
- lib/mobme/infrastructure/queue/backends/redis.rb
Constant Summary collapse
- NAMESPACE =
The namespace that all redis queue keys live inside Redis
'redis:queue:'
- QUEUESET =
The set used to store all keys
'redis:queue:set'
- UUID_SUFFIX =
The UUID suffix for keys that store values
':uuid'
- QUEUE_SUFFIX =
The sorted set suffix for the list of all keys in a queue
':queue'
- VALUE_SUFFIX =
The hash suffix for the hash that stores values of a queue
':values'
Instance Method Summary collapse
-
#add(queue, item, metadata = {}) ⇒ String
Add a value to a queue.
-
#add_bulk(queue, items = []) ⇒ Object
Add values to the queue in bulk This works by pipelining writes to Redis, so results are generally much faster.
-
#connect(options) ⇒ Object
Connect to Redis.
-
#empty(queue) ⇒ Object
Clear the queue.
-
#initialize(options = {}) ⇒ Redis
constructor
Initialises the Queue.
-
#list(queue) ⇒ Array<Object, Hash>
Lists all items in the queue.
-
#list_queues ⇒ Array
List all queues.
-
#peek(queue) ⇒ [Object, Hash]
Peek into the first item in a queue without removing it.
-
#remove(queue) {|[Object, Hash]| ... } ⇒ [Object, Hash]
Remove an item from a queue.
-
#remove_queues(*queues) ⇒ Object
(also: #remove_queue)
Delete queues.
-
#size(queue) ⇒ Integer
Find the size of a queue.
Constructor Details
#initialize(options = {}) ⇒ Redis
Initialises the Queue
23 24 25 26 27 28 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 23 def initialize( = {}) = .delete(:redis_options) || {} # Connect to Redis! connect() end |
Instance Method Details
#add(queue, item, metadata = {}) ⇒ String
Add a value to a queue
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 45 def add(queue, item, = {}) raise ArgumentError, "Metadata must be a hash, but #{metadata.class} given" unless .is_a? Hash = () uuid = generate_uuid(queue) add_to_queueset(queue) write_value(queue, uuid, item, ) add_to_queue(queue, uuid, ['dequeue-timestamp'], ['priority']) uuid end |
#add_bulk(queue, items = []) ⇒ Object
Add values to the queue in bulk This works by pipelining writes to Redis, so results are generally much faster
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 62 def add_bulk(queue, items = []) = {} # UUIDs have to be in sync! uuids = [] items.each do |item| uuids << generate_uuid(queue) end add_to_queueset(queue) @redis.pipelined do items.each do |item| uuid = uuids.shift # write value value_hash = "#{NAMESPACE}#{queue}#{VALUE_SUFFIX}" @redis.hset value_hash, uuid, serialize_item(item, ) # add to queue queue_key = NAMESPACE + queue.to_s + QUEUE_SUFFIX @redis.zadd queue_key, (['dequeue_timestamp'], ['priority']), uuid end end end |
#connect(options) ⇒ Object
Connect to Redis
33 34 35 36 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 33 def connect() @redis = .delete(:connection) @redis ||= Redis.new() end |
#empty(queue) ⇒ Object
Clear the queue
167 168 169 170 171 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 167 def empty(queue) # Delete key and value stores. @redis.del "#{NAMESPACE}#{queue}#{VALUE_SUFFIX}" @redis.del "#{NAMESPACE}#{queue}#{QUEUE_SUFFIX}" end |
#list(queue) ⇒ Array<Object, Hash>
Lists all items in the queue. This is an expensive operation
151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 151 def list(queue) batch_size = 1_000 # keep this low as the time complexity of zrangebyscore is O(log(N)+M) : M -> the size count = 0; values = [] (size(queue)/batch_size + 1).times do |i| limit = [(batch_size * i), batch_size] uuids = range_in_queue(queue, limit) batch_values = uuids.map { |uuid| read_value(queue, uuid) } values.push(*batch_values) end values end |
#list_queues ⇒ Array
List all queues
175 176 177 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 175 def list_queues @redis.smembers QUEUESET end |
#peek(queue) ⇒ [Object, Hash]
Peek into the first item in a queue without removing it
135 136 137 138 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 135 def peek(queue) uuid = first_in_queue(queue) read_value(queue, uuid) end |
#remove(queue) {|[Object, Hash]| ... } ⇒ [Object, Hash]
Remove an item from a queue. When a block is passed, the item is reserved instead and automatically put back in case of an error. Raise MobME::Infrastructure::QueueRemoveAbort within the block to manually abort the remove.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 95 def remove(queue, &block) begin # Remove the first item! uuid = first_in_queue(queue) if uuid # If we're not able to remove the key from the set here, it means that # some other thread (or evented operation) has done it before us, so # the current remove is invalid and we should retry! raise MobME::Infrastructure::Queue::RemoveConflictException unless remove_from_queue(queue, uuid) queue_item = read_value(queue, uuid) # When a block is given, safely reserve the queue item if block_given? begin block.call(queue_item) remove_value(queue, uuid) rescue #generic error put_back_in_queue(queue, uuid, queue_item) # And now re-raise the error raise rescue MobME::Infrastructure::Queue::RemoveAbort put_back_in_queue(queue, uuid, queue_item) end else remove_value(queue, uuid) queue_item end else nil end rescue MobME::Infrastructure::Queue::RemoveConflictException retry end end |
#remove_queues(*queues) ⇒ Object Also known as: remove_queue
Delete queues
181 182 183 184 185 186 187 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 181 def remove_queues(*queues) queues = list_queues if queues.empty? queues.each do |queue_name| empty(queue_name) remove_from_queueset(queue_name) end end |
#size(queue) ⇒ Integer
Find the size of a queue
143 144 145 146 |
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 143 def size(queue) queue = NAMESPACE + queue.to_s + QUEUE_SUFFIX length = (@redis.zcard queue) end |