Class: Upperkut::Strategies::ScheduledQueue
- Inherits:
-
Base
- Object
- Base
- Upperkut::Strategies::ScheduledQueue
show all
- Includes:
- Util
- Defined in:
- lib/upperkut/strategies/scheduled_queue.rb
Overview
Public: Encapsulates methods required to build a Scheculed Queue Items are queued, but are only fetched at a specific point in time.
Constant Summary
collapse
- ZPOPBYRANGE =
%(
local score_from = ARGV[1]
local score_to = ARGV[2]
local limit = ARGV[3]
local values = redis.call('zrangebyscore', KEYS[1], score_from, score_to, 'LIMIT', '0', limit)
if table.getn(values) > 0 then
redis.call('zrem', KEYS[1], unpack(values))
end
return values
).freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#decode_json_items, #encode_json_items, #to_underscore
Constructor Details
#initialize(worker, options = {}) ⇒ ScheduledQueue
Returns a new instance of ScheduledQueue.
29
30
31
32
33
34
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 29
def initialize(worker, options = {})
@options = options
initialize_options
@redis_pool = setup_redis_pool
@worker = worker
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
27
28
29
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 27
def options
@options
end
|
Instance Method Details
#clear ⇒ Object
65
66
67
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 65
def clear
redis { |conn| conn.del(key) }
end
|
#fetch_items ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 50
def fetch_items
args = {
value_from: '-inf'.freeze,
value_to: Time.now.utc.to_f.to_s,
limit: @batch_size
}
items = []
redis do |conn|
items = pop_values(conn, args)
end
decode_json_items(items)
end
|
#metrics ⇒ Object
69
70
71
72
73
74
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 69
def metrics
{
'latency' => latency,
'size' => size
}
end
|
#process? ⇒ Boolean
76
77
78
79
80
81
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 76
def process?
buff_size = size('-inf', Time.now.utc.to_i)
return true if fulfill_condition?(buff_size)
false
end
|
#push_items(items = []) ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 36
def push_items(items = [])
items = [items] if items.is_a?(Hash)
return false if items.empty?
redis do |conn|
items.each do |item|
ensure_timestamp_attr(item)
conn.zadd(key, item['timestamp'], encode_json_item(item))
end
end
true
end
|