Class: Upperkut::Strategies::ScheduledQueue
- Inherits:
-
Base
- Object
- Base
- Upperkut::Strategies::ScheduledQueue
show all
- Includes:
- Util
- Defined in:
- lib/upperkut/strategies/scheduled_queue.rb
Overview
Public: Encapsulates methods required to build a Scheculed Queue Items are queued, but are only fetched at a specific point in time.
Constant Summary
collapse
- ZPOPBYRANGE =
%(
local score_from = ARGV[1]
local score_to = ARGV[2]
local limit = ARGV[3]
local values = redis.call('zrangebyscore', KEYS[1], score_from, score_to, 'LIMIT', '0', limit)
if table.getn(values) > 0 then
redis.call('zrem', KEYS[1], unpack(values))
end
return values
).freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#decode_json_items, #encode_json_items, #normalize_items, #retry_block, #to_underscore
Constructor Details
#initialize(worker, options = {}) ⇒ ScheduledQueue
Returns a new instance of ScheduledQueue.
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 29
def initialize(worker, options = {})
@options = options
@redis_options = @options.fetch(:redis, {})
@worker = worker
@batch_size = @options.fetch(
:batch_size,
Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000)
)
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
27
28
29
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 27
def options
@options
end
|
Instance Method Details
#ack(_items) ⇒ Object
74
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 74
def ack(_items); end
|
#clear ⇒ Object
70
71
72
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 70
def clear
redis { |conn| conn.del(key) }
end
|
#fetch_items ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 55
def fetch_items
args = {
value_from: '-inf'.freeze,
value_to: Time.now.utc.to_f.to_s,
limit: @batch_size
}
items = []
redis do |conn|
items = pop_values(conn, args)
end
decode_json_items(items)
end
|
#metrics ⇒ Object
80
81
82
83
84
85
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 80
def metrics
{
'latency' => latency,
'size' => size
}
end
|
#nack(items) ⇒ Object
76
77
78
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 76
def nack(items)
push_items(items)
end
|
#process? ⇒ Boolean
87
88
89
90
91
92
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 87
def process?
buff_size = size('-inf', Time.now.utc.to_i)
return true if fulfill_condition?(buff_size)
false
end
|
#push_items(items = []) ⇒ Object
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/upperkut/strategies/scheduled_queue.rb', line 40
def push_items(items = [])
items = normalize_items(items)
return false if items.empty?
redis do |conn|
items.each do |item|
schedule_item = ensure_timestamp_attr(item)
timestamp = schedule_item.body['timestamp']
conn.zadd(key, timestamp, encode_json_items(schedule_item))
end
end
true
end
|