Class: LogStash::Util::WrappedAckedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Overview

Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.

Defined Under Namespace

Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object



28
29
30
31
32
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 28

def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end

.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object



22
23
24
25
26
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 22

def self.create_memory_based(path, capacity, max_events, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
  )
end

Instance Method Details

#check_closed(action) ⇒ Object



94
95
96
97
98
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 94

def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end

#closeObject



100
101
102
103
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 100

def close
  @queue.close
  @closed.make_true
end

#closed?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 43

def closed?
  @closed.true?
end

#offer(obj, timeout_ms) ⇒ Boolean

TODO - fix doc for this noop method Offer an object to the queue, wait for the specified amount of time. If adding to the queue was successful it will return true, false otherwise.

Parameters:

  • Object (Object)

    to add to the queue

  • Time (Integer)

    in milliseconds to wait before giving up

Returns:

  • (Boolean)

    True if adding was successfull if not it return false

Raises:



64
65
66
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 64

def offer(obj, timeout_ms)
  raise NotImplementedError.new("The offer method is not implemented. There is no non blocking write operation yet.")
end

#poll(millis) ⇒ Object

Block for X millis



76
77
78
79
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 76

def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end

#push(obj) ⇒ Object Also known as: <<

Push an object to the queue if the queue is full it will block until the object can be added to the queue.

Parameters:

  • Object (Object)

    to add to the queue



51
52
53
54
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 51

def push(obj)
  check_closed("write")
  @queue.write(obj)
end

#read_batch(size, wait) ⇒ Object



81
82
83
84
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 81

def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end

#read_clientObject



90
91
92
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 90

def read_client()
  ReadClient.new(self)
end

#takeObject

Blocking



69
70
71
72
73
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 69

def take
  check_closed("read a batch")
  # TODO - determine better arbitrary timeout millis
  @queue.read_batch(1, 200).get_elements.first
end

#with_queue(queue) ⇒ Object



36
37
38
39
40
41
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 36

def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end

#write_clientObject



86
87
88
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 86

def write_client
  WriteClient.new(self)
end