Class: BBFS::Log::BufferConsumerProducer

Inherits:
Consumer
  • Object
show all
Defined in:
lib/log/log_consumer.rb

Overview

BufferConsumerProducer acts as a consumer and as a producer. It has it’s own consumers which are added to it. It saves all the data it consumes in a buffer which has a size and time limits. When one of the limits is exceeded, it flushes the buffer to it’s own consumers

Instance Method Summary collapse

Methods inherited from Consumer

#push_data

Constructor Details

#initialize(buffer_size_in_mega_bytes, buffer_time_out_in_seconds) ⇒ BufferConsumerProducer

Returns a new instance of BufferConsumerProducer.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/log/log_consumer.rb', line 40

def initialize buffer_size_in_mega_bytes, buffer_time_out_in_seconds
  super()
  @buffer_size_in_bytes = buffer_size_in_mega_bytes * 1000000
  @buffer_time_out_in_seconds = buffer_time_out_in_seconds
  @time_at_last_flush = Time.now.to_i
  @buffer = []
  @consumers = []
  Thread.new do
    while (true)
      if @consumer_queue.empty? then
        @consumer_queue.push nil
        sleep Params['log_param_thread_sleep_time_in_seconds']
      end
    end
  end
end

Instance Method Details

#add_consumer(consumer) ⇒ Object



57
58
59
# File 'lib/log/log_consumer.rb', line 57

def add_consumer consumer
  @consumers.push consumer
end

#consume(data) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/log/log_consumer.rb', line 61

def consume data
  @buffer.push data if not data.nil?
  if (@buffer.inspect.size >= @buffer_size_in_bytes) or
      ((Time.now.to_i - @time_at_last_flush) >= @buffer_time_out_in_seconds) then
    flush_to_consumers
  end
end

#flush_to_consumersObject

flush the DB to the consumers



70
71
72
73
74
# File 'lib/log/log_consumer.rb', line 70

def flush_to_consumers
  @consumers.each { |consumer| consumer.push_data @buffer}
  @buffer.clear
  @time_at_last_flush = Time.now.to_i
end