Class: Capturing::PrintRunner

Inherits:
Runner
  • Object
show all
Defined in:
lib/capture.rb

Instance Method Summary collapse

Methods inherited from Runner

#destroy!, #run!

Constructor Details

#initialize(e, writer:) ⇒ PrintRunner

Returns a new instance of PrintRunner.



40
41
42
43
44
45
# File 'lib/capture.rb', line 40

def initialize(e, writer:)
  super e
  @writer = writer
  @unpacker = Fluent::Engine.msgpack_factory.unpacker
  @time_formatter = Fluent::TimeFormatter.new("%Y-%m-%d %H:%M:%S %z", true, nil) # TODO support format
end

Instance Method Details

#format_message(tag, entry) ⇒ Object



69
70
71
72
# File 'lib/capture.rb', line 69

def format_message(tag, entry)
  time, record = entry
  "#{@time_formatter.format(time)} | tag=#{tag} msg=#{record.inspect}\n"
end

#on_packet(data) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/capture.rb', line 47

def on_packet(data)
  @unpacker.feed_each(data) do |msg|
    tag = msg[0]
    entries = msg[1]
    case entries
    when String
      option = msg[2]
      size = (option && option["size"]) || 0
      es_class = (option && option["compressed"] == "gzip") ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
      es_class.new(entries, nil, size.to_i).each do |time, record|
        @writer.write(format_message(tag, [time, record]))
      end
    when Array
      entries.each do |e|
        @writer.write(format_message(tag, e))
      end
    else
      raise "Unsuooprted entry format '#{entry.class}'" # TODO
    end
  end
end