Class: Esse::RedisStorage::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/esse/redis_storage/queue.rb

Constant Summary collapse

GROUP =
"queue"
SEPARATOR =
":"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:) ⇒ Queue

Returns a new instance of Queue.



24
25
26
# File 'lib/esse/redis_storage/queue.rb', line 24

def initialize(name:)
  @name = [Esse::RedisStorage::NAMESPACE, GROUP, name].compact.join(SEPARATOR)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



13
14
15
# File 'lib/esse/redis_storage/queue.rb', line 13

def name
  @name
end

Class Method Details

.batch_idObject



15
16
17
# File 'lib/esse/redis_storage/queue.rb', line 15

def self.batch_id
  String.new(Time.now.strftime("%Y%m%d%H%M%S%L-")) << SecureRandom.hex(4)
end

.for(repo:, attribute_name: nil) ⇒ Object



19
20
21
22
# File 'lib/esse/redis_storage/queue.rb', line 19

def self.for(repo:, attribute_name: nil)
  name = [repo.index.index_name, repo.repo_name, attribute_name].compact.join(SEPARATOR)
  new(name: name)
end

Instance Method Details

#clearObject

Clear the queue



64
65
66
67
68
# File 'lib/esse/redis_storage/queue.rb', line 64

def clear
  with do |conn|
    conn.del(name)
  end
end

#delete(batch_id) ⇒ Object



57
58
59
60
61
# File 'lib/esse/redis_storage/queue.rb', line 57

def delete(batch_id)
  with do |conn|
    conn.hdel(name, batch_id)
  end
end

#eachObject



78
79
80
81
82
83
84
# File 'lib/esse/redis_storage/queue.rb', line 78

def each
  with do |conn|
    conn.hscan_each(name, count: 1000) do |batch_id, values|
      yield batch_id, ::MultiJson.load(values)
    end
  end
end

#enqueue(id: nil, values: [], ttl: nil) ⇒ Object

Enqueue a batch of ids to process

Parameters:

  • id (String) (defaults to: nil)

    The batch id

  • values (Array<String>) (defaults to: [])

    The values of the batch



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/esse/redis_storage/queue.rb', line 31

def enqueue(id: nil, values: [], ttl: nil)
  return if values.nil? || values.empty?

  ttl ||= Esse.config.redis_queue_ttl
  values = ::Esse::ArrayUtils.wrap(values)
  batch_id = id || self.class.batch_id
  with do |conn|
    conn.hset(name, batch_id, ::MultiJson.dump(values))
    conn.expire(name, ttl) if ttl
  end
  batch_id
end

#fetch(batch_id) {|Array<String>| ... } ⇒ Object

Fetch and remove a batch of ids to process from the queue using batch_id

Parameters:

  • batch_id (String)

    The batch id to fetch

Yields:

  • (Array<String>)

    The values of the batch



47
48
49
50
51
52
53
54
55
# File 'lib/esse/redis_storage/queue.rb', line 47

def fetch(batch_id)
  with do |conn|
    values = conn.hget(name, batch_id)
    return unless values

    yield ::MultiJson.load(values)
    conn.hdel(name, batch_id)
  end
end

#sizeInteger

Get the size of the queue

Returns:

  • (Integer)

    The size of the queue



72
73
74
75
76
# File 'lib/esse/redis_storage/queue.rb', line 72

def size
  with do |conn|
    conn.hlen(name)
  end
end