Class: Esse::RedisStorage::Queue
- Inherits:
-
Object
- Object
- Esse::RedisStorage::Queue
- Extended by:
- Forwardable
- Includes:
- Enumerable
- Defined in:
- lib/esse/redis_storage/queue.rb
Constant Summary collapse
- GROUP =
"queue"
- SEPARATOR =
":"
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
-
#clear ⇒ Object
Clear the queue.
- #delete(batch_id) ⇒ Object
- #each ⇒ Object
-
#enqueue(id: nil, values: [], ttl: nil) ⇒ Object
Enqueue a batch of ids to process.
-
#fetch(batch_id) {|Array<String>| ... } ⇒ Object
Fetch and remove a batch of ids to process from the queue using batch_id.
-
#initialize(name:) ⇒ Queue
constructor
A new instance of Queue.
-
#size ⇒ Integer
Get the size of the queue.
Constructor Details
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
13 14 15 |
# File 'lib/esse/redis_storage/queue.rb', line 13 def name @name end |
Class Method Details
.batch_id ⇒ Object
15 16 17 |
# File 'lib/esse/redis_storage/queue.rb', line 15 def self.batch_id String.new(Time.now.strftime("%Y%m%d%H%M%S%L-")) << SecureRandom.hex(4) end |
.for(repo:, attribute_name: nil) ⇒ Object
19 20 21 22 |
# File 'lib/esse/redis_storage/queue.rb', line 19 def self.for(repo:, attribute_name: nil) name = [repo.index.index_name, repo.repo_name, attribute_name].compact.join(SEPARATOR) new(name: name) end |
Instance Method Details
#clear ⇒ Object
Clear the queue
64 65 66 67 68 |
# File 'lib/esse/redis_storage/queue.rb', line 64 def clear with do |conn| conn.del(name) end end |
#delete(batch_id) ⇒ Object
57 58 59 60 61 |
# File 'lib/esse/redis_storage/queue.rb', line 57 def delete(batch_id) with do |conn| conn.hdel(name, batch_id) end end |
#each ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/esse/redis_storage/queue.rb', line 78 def each with do |conn| conn.hscan_each(name, count: 1000) do |batch_id, values| yield batch_id, ::MultiJson.load(values) end end end |
#enqueue(id: nil, values: [], ttl: nil) ⇒ Object
Enqueue a batch of ids to process
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/esse/redis_storage/queue.rb', line 31 def enqueue(id: nil, values: [], ttl: nil) return if values.nil? || values.empty? ttl ||= Esse.config.redis_queue_ttl values = ::Esse::ArrayUtils.wrap(values) batch_id = id || self.class.batch_id with do |conn| conn.hset(name, batch_id, ::MultiJson.dump(values)) conn.expire(name, ttl) if ttl end batch_id end |
#fetch(batch_id) {|Array<String>| ... } ⇒ Object
Fetch and remove a batch of ids to process from the queue using batch_id
47 48 49 50 51 52 53 54 55 |
# File 'lib/esse/redis_storage/queue.rb', line 47 def fetch(batch_id) with do |conn| values = conn.hget(name, batch_id) return unless values yield ::MultiJson.load(values) conn.hdel(name, batch_id) end end |
#size ⇒ Integer
Get the size of the queue
72 73 74 75 76 |
# File 'lib/esse/redis_storage/queue.rb', line 72 def size with do |conn| conn.hlen(name) end end |