Class: Fluent::Plugin::UnixInput::Handler

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin/in_unix.rb

Instance Method Summary collapse

Constructor Details

#initialize(io, log, on_message) ⇒ Handler

Returns a new instance of Handler.



147
148
149
150
151
152
# File 'lib/fluent/plugin/in_unix.rb', line 147

def initialize(io, log, on_message)
  super(io)

  @on_message = on_message
  @log = log
end

Instance Method Details

#on_closeObject



190
191
192
# File 'lib/fluent/plugin/in_unix.rb', line 190

def on_close
  @log.trace { "closed fluent socket object_id=#{self.object_id}" }
end

#on_connectObject



154
155
# File 'lib/fluent/plugin/in_unix.rb', line 154

def on_connect
end

#on_read(data) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/in_unix.rb', line 157

def on_read(data)
  first = data[0]
  if first == '{'.freeze || first == '['.freeze
    m = method(:on_read_json)
    @parser = Yajl::Parser.new
    @parser.on_parse_complete = @on_message
  else
    m = method(:on_read_msgpack)
    @parser = Fluent::MessagePackFactory.msgpack_unpacker
  end

  singleton_class.module_eval do
    define_method(:on_read, m)
  end
  m.call(data)
end

#on_read_json(data) ⇒ Object



174
175
176
177
178
179
180
# File 'lib/fluent/plugin/in_unix.rb', line 174

def on_read_json(data)
  @parser << data
rescue => e
  @log.error "unexpected error in json payload", error: e.to_s
  @log.error_backtrace
  close
end

#on_read_msgpack(data) ⇒ Object



182
183
184
185
186
187
188
# File 'lib/fluent/plugin/in_unix.rb', line 182

def on_read_msgpack(data)
  @parser.feed_each(data, &@on_message)
rescue => e
  @log.error "unexpected error in msgpack payload", error: e.to_s
  @log.error_backtrace
  close
end