Class: Anschel::Input

Inherits:
Object
  • Object
show all
Defined in:
lib/anschel/input.rb,
lib/anschel/input/base.rb,
lib/anschel/input/kafka.rb,
lib/anschel/input/rabbitmq.rb

Defined Under Namespace

Classes: Base, Kafka, RabbitMQ

Instance Method Summary collapse

Constructor Details

#initialize(config, qsize, stats, log, leftovers = []) ⇒ Input

Returns a new instance of Input.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/anschel/input.rb', line 7

def initialize config, qsize, stats, log, leftovers=[]
  log.info event: 'output-loading'
  log.info event: 'output-config', config: config, qsize: qsize

  @queue = SizedQueue.new(qsize || 2000)

  Thread.new do
    leftovers ||= []
    log.warn event: 'input-leftovers', leftovers_size: leftovers.size
    leftovers.each { |l| @queue << l }
  end

  @inputs = []

  stats.create 'input'
  stats.get 'input'

  config.each do |input|
    case input.delete(:kind)
    when 'kafka'
      @inputs << Input::Kafka.new(@queue, input, stats, log)
      log.info event: 'input-loaded', kind: 'kafka'
    when 'rabbitmq'
      @inputs << Input::RabbitMQ.new(@queue, input, stats, log)
      log.info event: 'input-loaded', kind: 'rabbitmq'
    else
      raise 'Uknown input type'
    end
  end

  log.info event: 'input-fully-loaded'
end

Instance Method Details

#leftoversObject



49
50
51
# File 'lib/anschel/input.rb', line 49

def leftovers
  return @leftovers if defined? @leftovers
end

#shiftObject



54
55
56
57
58
59
60
61
62
# File 'lib/anschel/input.rb', line 54

def shift
  event = @queue.shift
  case event
  when String
    event
  else
    event.message.to_s
  end
end

#stopObject



41
42
43
44
45
46
# File 'lib/anschel/input.rb', line 41

def stop
  return @leftovers if defined? @leftovers
  @inputs.map &:stop
  @leftovers = []
  @leftovers << shift until @queue.empty?
end