Module: Fluent::WireProtocolCompatInput::SocketUtilBaseInputMethods

Defined in:
lib/fluent/plugin/in_wire_protocol_compat.rb

Instance Method Summary collapse

Instance Method Details

#on_message(msg, addr) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_wire_protocol_compat.rb', line 79

def on_message(msg, addr)
  @parser.parse(msg) do |*parsed|
    case parsed[0]
    when String
      tag, time, record = parsed
      records = time.is_a?(Array) ? time : [ [ time, record ] ]
    when Integer
      tag = @tag
      records = [ parsed ]
    end

    time, record = records.first
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    es = MultiEventStream.new
    records.each do |time, record|
      record[@source_host_key] = addr[3] if @source_host_key
      es.add(time, record)
    end
    router.emit_stream(tag, es)
  end
rescue => e
  log.error msg.dump, :error => e, :error_class => e.class, :host => addr[3]
  log.error_backtrace
end