Class: Capturing::PrintRunner
Instance Method Summary collapse
- #format_message(tag, entry) ⇒ Object
-
#initialize(e, writer:) ⇒ PrintRunner
constructor
A new instance of PrintRunner.
- #on_packet(data) ⇒ Object
Methods inherited from Runner
Constructor Details
#initialize(e, writer:) ⇒ PrintRunner
Returns a new instance of PrintRunner.
40 41 42 43 44 45 |
# File 'lib/capture.rb', line 40 def initialize(e, writer:) super e @writer = writer @unpacker = Fluent::Engine.msgpack_factory.unpacker @time_formatter = Fluent::TimeFormatter.new("%Y-%m-%d %H:%M:%S %z", true, nil) # TODO support format end |
Instance Method Details
#format_message(tag, entry) ⇒ Object
69 70 71 72 |
# File 'lib/capture.rb', line 69 def (tag, entry) time, record = entry "#{@time_formatter.format(time)} | tag=#{tag} msg=#{record.inspect}\n" end |
#on_packet(data) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/capture.rb', line 47 def on_packet(data) @unpacker.feed_each(data) do |msg| tag = msg[0] entries = msg[1] case entries when String option = msg[2] size = (option && option["size"]) || 0 es_class = (option && option["compressed"] == "gzip") ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream es_class.new(entries, nil, size.to_i).each do |time, record| @writer.write((tag, [time, record])) end when Array entries.each do |e| @writer.write((tag, e)) end else raise "Unsuooprted entry format '#{entry.class}'" # TODO end end end |