Class: Anschel::Input
- Inherits:
-
Object
- Object
- Anschel::Input
- Defined in:
- lib/anschel/input.rb,
lib/anschel/input/base.rb,
lib/anschel/input/kafka.rb,
lib/anschel/input/rabbitmq.rb
Defined Under Namespace
Classes: Base, Kafka, RabbitMQ
Instance Method Summary collapse
-
#initialize(config, qsize, stats, log, leftovers = []) ⇒ Input
constructor
A new instance of Input.
- #leftovers ⇒ Object
- #shift ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(config, qsize, stats, log, leftovers = []) ⇒ Input
Returns a new instance of Input.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/anschel/input.rb', line 7 def initialize config, qsize, stats, log, leftovers=[] log.info event: 'output-loading' log.info event: 'output-config', config: config, qsize: qsize @queue = SizedQueue.new(qsize || 2000) Thread.new do leftovers ||= [] log.warn event: 'input-leftovers', leftovers_size: leftovers.size leftovers.each { |l| @queue << l } end @inputs = [] stats.create 'input' stats.get 'input' config.each do |input| case input.delete(:kind) when 'kafka' @inputs << Input::Kafka.new(@queue, input, stats, log) log.info event: 'input-loaded', kind: 'kafka' when 'rabbitmq' @inputs << Input::RabbitMQ.new(@queue, input, stats, log) log.info event: 'input-loaded', kind: 'rabbitmq' else raise 'Uknown input type' end end log.info event: 'input-fully-loaded' end |
Instance Method Details
#leftovers ⇒ Object
49 50 51 |
# File 'lib/anschel/input.rb', line 49 def leftovers return @leftovers if defined? @leftovers end |
#shift ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/anschel/input.rb', line 54 def shift event = @queue.shift case event when String event else event..to_s end end |
#stop ⇒ Object
41 42 43 44 45 46 |
# File 'lib/anschel/input.rb', line 41 def stop return @leftovers if defined? @leftovers @inputs.map &:stop @leftovers = [] @leftovers << shift until @queue.empty? end |