Class: Upperkut::Strategies::BufferedQueue
- Inherits:
-
Base
- Object
- Base
- Upperkut::Strategies::BufferedQueue
show all
- Includes:
- Util
- Defined in:
- lib/upperkut/strategies/buffered_queue.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#decode_json_items, #encode_json_items, #to_underscore
Constructor Details
#initialize(worker, options = {}) ⇒ BufferedQueue
Returns a new instance of BufferedQueue.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 12
def initialize(worker, options = {})
@options = options
@redis_options = options.fetch(:redis, {})
@redis_pool = setup_redis_pool
@worker = worker
@max_wait = options.fetch(
:max_wait,
Integer(ENV['UPPERKUT_MAX_WAIT'] || 20)
)
@batch_size = options.fetch(
:batch_size,
Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000)
)
@waiting_time = 0
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
10
11
12
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 10
def options
@options
end
|
Instance Method Details
#clear ⇒ Object
53
54
55
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 53
def clear
redis { |conn| conn.del(key) }
end
|
#fetch_items ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 41
def fetch_items
stop = [@batch_size, size].min
items = redis do |conn|
conn.multi do
stop.times { conn.lpop(key) }
end
end
decode_json_items(items)
end
|
#metrics ⇒ Object
57
58
59
60
61
62
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 57
def metrics
{
'latency' => latency,
'size' => size
}
end
|
#process? ⇒ Boolean
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 64
def process?
buff_size = size
if fulfill_condition?(buff_size)
@waiting_time = 0
return true
else
@waiting_time += @worker.setup.polling_interval
return false
end
end
|
#push_items(items = []) ⇒ Object
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/upperkut/strategies/buffered_queue.rb', line 30
def push_items(items = [])
items = [items] if items.is_a?(Hash)
return false if items.empty?
redis do |conn|
conn.rpush(key, encode_json_items(items))
end
true
end
|