Module: Fluent::Test::Driver::EventFeeder

Included in:
Filter, MultiOutput, Output
Defined in:
lib/fluent/test/driver/event_feeder.rb

Instance Method Summary collapse

Instance Method Details

#feed(*args) ⇒ Object

d.run do

d.feed('tag', time, {record})
d.feed('tag', [ [time, {record}], [time, {record}], ... ])
d.feed('tag', es)

end d.run(default_tag: ‘tag’) do

d.feed({record})
d.feed(time, {record})
d.feed([ [time, {record}], [time, {record}], ... ])
d.feed(es)

end



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/test/driver/event_feeder.rb', line 57

def feed(*args)
  case args.size
  when 1
    raise ArgumentError, "tag not specified without default_tag" unless @default_tag
    case args.first
    when Fluent::EventStream
      feed_to_plugin(@default_tag, args.first)
    when Array
      feed_to_plugin(@default_tag, ArrayEventStream.new(args.first))
    when Hash
      record = args.first
      time = Fluent::EventTime.now
      feed_to_plugin(@default_tag, OneEventStream.new(time, record))
    else
      raise ArgumentError, "unexpected events object (neither event(Hash), EventStream nor Array): #{args.first.class}"
    end
  when 2
    if args[0].is_a?(String) && (args[1].is_a?(Array) || args[1].is_a?(Fluent::EventStream))
      tag, es = args
      es = ArrayEventStream.new(es) if es.is_a?(Array)
      feed_to_plugin(tag, es)
    elsif @default_tag && (args[0].is_a?(Fluent::EventTime) || args[0].is_a?(Integer)) && args[1].is_a?(Hash)
      time, record = args
      feed_to_plugin(@default_tag, OneEventStream.new(time, record))
    else
      raise ArgumentError, "unexpected values of argument: #{args[0].class}, #{args[1].class}"
    end
  when 3
    tag, time, record = args
    if tag.is_a?(String) && (time.is_a?(Fluent::EventTime) || time.is_a?(Integer)) && record.is_a?(Hash)
      feed_to_plugin(tag, OneEventStream.new(time, record))
    else
      raise ArgumentError, "unexpected values of argument: #{tag.class}, #{time.class}, #{record.class}"
    end
  else
    raise ArgumentError, "unexpected number of arguments: #{args}"
  end
end

#feed_to_plugin(tag, es) ⇒ Object



42
43
44
# File 'lib/fluent/test/driver/event_feeder.rb', line 42

def feed_to_plugin(tag, es)
  @instance.__send__(@feed_method, tag, es)
end

#initialize(klass, opts: {}, &block) ⇒ Object



24
25
26
27
28
# File 'lib/fluent/test/driver/event_feeder.rb', line 24

def initialize(klass, opts: {}, &block)
  super
  @default_tag = nil
  @feed_method = nil
end

#run(default_tag: nil, **kwargs, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/test/driver/event_feeder.rb', line 30

def run(default_tag: nil, **kwargs, &block)
  @feed_method = if @instance.respond_to?(:filter_stream)
                   :filter_stream
                 else
                   :emit_events
                 end
  if default_tag
    @default_tag = default_tag
  end
  super(**kwargs, &block)
end