10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/logstash/queue_factory.rb', line 10
def self.create(settings)
queue_type = settings.get("queue.type")
queue_page_capacity = settings.get("queue.page_capacity")
queue_max_bytes = settings.get("queue.max_bytes")
queue_max_events = settings.get("queue.max_events")
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
case queue_type
when "memory_acked"
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_bytes)
when "memory"
LogStash::Util::WrappedSynchronousQueue.new
when "persisted"
queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id"))
FileUtils.mkdir_p(queue_path)
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
else
raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'"
end
end
|