Class: LogCourier::EventQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/event_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(max) ⇒ EventQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/log-courier/event_queue.rb', line 35

def initialize(max)
  raise ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @enque_cond = ConditionVariable.new
  @num_enqueue_waiting = 0

  @que = []
  @que.taint          # enable tainted communication
  @num_waiting = 0
  self.taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#_pop_timeout(timeout = nil) ⇒ Object

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue or, if set, timeout seconds passes. If timeout is 0, the thread isn’t suspended, and an exception is raised.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/log-courier/event_queue.rb', line 131

def _pop_timeout(timeout = nil)
  unless timeout.nil?
    start = Time.now
  end
  @mutex.synchronize do
    loop do
      return @que.shift unless @que.empty?
      raise TimeoutError if timeout == 0
      begin
        @num_waiting += 1
        @cond.wait @mutex, timeout
      ensure
        @num_waiting -= 1
      end
      raise TimeoutError if !timeout.nil? and Time.now - start >= timeout
    end
  end
end

#clearObject

Removes all objects from the queue.



170
171
172
173
# File 'lib/log-courier/event_queue.rb', line 170

def clear
  @que.clear
  self
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


163
164
165
# File 'lib/log-courier/event_queue.rb', line 163

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



178
179
180
# File 'lib/log-courier/event_queue.rb', line 178

def length
  @que.length
end

#maxObject

Returns the maximum size of the queue.



52
53
54
# File 'lib/log-courier/event_queue.rb', line 52

def max
  @max
end

#max=(max) ⇒ Object

Sets the maximum size of the queue.

Raises:

  • (ArgumentError)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/log-courier/event_queue.rb', line 59

def max=(max)
  raise ArgumentError, "queue size must be positive" unless max > 0

  @mutex.synchronize do
    if max <= @max
      @max = max
    else
      diff = max - @max
      @max = max
      diff.times do
        @enque_cond.signal
      end
    end
  end
  max
end

#num_waitingObject

Returns the number of threads waiting on the queue.



190
191
192
# File 'lib/log-courier/event_queue.rb', line 190

def num_waiting
  @num_waiting + @num_enqueue_waiting
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



115
116
117
118
119
120
121
122
123
# File 'lib/log-courier/event_queue.rb', line 115

def pop(*args)
  retval = _pop_timeout *args
  @mutex.synchronize do
    if @que.length < @max
      @enque_cond.signal
    end
  end
  retval
end

#push(obj, timeout = nil) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available, up to a maximum of timeout seconds.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/log-courier/event_queue.rb', line 80

def push(obj, timeout = nil)
  unless timeout.nil?
    start = Time.now
  end
  @mutex.synchronize do
    loop do
      break if @que.length < @max
      @num_enqueue_waiting += 1
      begin
        @enque_cond.wait @mutex, timeout
      ensure
        @num_enqueue_waiting -= 1
      end
      raise TimeoutError if !timeout.nil? and Time.now - start >= timeout
    end

    @que.push obj
    @cond.signal
  end
  self
end