Class: LogStash::Util::WrappedAckedQueue
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedAckedQueue
show all
- Defined in:
- lib/logstash/util/wrapped_acked_queue.rb
Overview
Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.
Defined Under Namespace
Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object
28
29
30
31
32
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 28
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
self.allocate.with_queue(
LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
)
end
|
.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object
22
23
24
25
26
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 22
def self.create_memory_based(path, capacity, max_events, max_bytes)
self.allocate.with_queue(
LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
)
end
|
Instance Method Details
#check_closed(action) ⇒ Object
94
95
96
97
98
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 94
def check_closed(action)
if closed?
raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
end
end
|
#close ⇒ Object
100
101
102
103
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 100
def close
@queue.close
@closed.make_true
end
|
#closed? ⇒ Boolean
43
44
45
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 43
def closed?
@closed.true?
end
|
#offer(obj, timeout_ms) ⇒ Boolean
TODO - fix doc for this noop method Offer an object to the queue, wait for the specified amount of time. If adding to the queue was successful it will return true, false otherwise.
64
65
66
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 64
def offer(obj, timeout_ms)
raise NotImplementedError.new("The offer method is not implemented. There is no non blocking write operation yet.")
end
|
#poll(millis) ⇒ Object
76
77
78
79
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 76
def poll(millis)
check_closed("read")
@queue.read_batch(1, millis).get_elements.first
end
|
#push(obj) ⇒ Object
Also known as:
<<
Push an object to the queue if the queue is full it will block until the object can be added to the queue.
51
52
53
54
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 51
def push(obj)
check_closed("write")
@queue.write(obj)
end
|
#read_batch(size, wait) ⇒ Object
81
82
83
84
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 81
def read_batch(size, wait)
check_closed("read a batch")
@queue.read_batch(size, wait)
end
|
#read_client ⇒ Object
90
91
92
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 90
def read_client()
ReadClient.new(self)
end
|
#take ⇒ Object
69
70
71
72
73
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 69
def take
check_closed("read a batch")
@queue.read_batch(1, 200).get_elements.first
end
|
#with_queue(queue) ⇒ Object
36
37
38
39
40
41
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 36
def with_queue(queue)
@queue = queue
@queue.open
@closed = Concurrent::AtomicBoolean.new(false)
self
end
|
#write_client ⇒ Object
86
87
88
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 86
def write_client
WriteClient.new(self)
end
|