Module: Fluent::NorikraPlugin::InputMixin

Included in:
Fluent::NorikraFilterOutput, Fluent::NorikraInput
Defined in:
lib/fluent/plugin/norikra/input.rb

Instance Method Summary collapse

Instance Method Details

#fetch_workerObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/norikra/input.rb', line 78

def fetch_worker
  while sleep(1)
    break unless @fetch_worker_running
    next unless fetchable?
    next if @fetch_queue.first.nil? || @fetch_queue.first.time > Time.now

    now = Time.now
    while @fetch_queue.first.time <= now
      req = @fetch_queue.shift

      begin
        data = req.fetch(client())
      rescue => e
        log.error "failed to fetch", :norikra => "#{@host}:#{@port}", :method => req.method, :target => req.target, :error => e.class, :message => e.message
      end

      if data
        data.each do |tag, event_array|
          next unless event_array
          event_array.each do |time,event|
            begin
              Fluent::Engine.emit(tag, time, event)
            rescue => e
              log.error "failed to emit event from norikra query", :norikra => "#{@host}:#{@port}", :error => e.class, :message => e.message, :tag => tag, :record => event
            end
          end
        end
      end

      insert_fetch_queue(req)
    end
  end
end

#insert_fetch_queue(request) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/norikra/input.rb', line 62

def insert_fetch_queue(request)
  @fetch_queue_mutex.synchronize do
    request.next! if request.time < Time.now
    # if @fetch_queue.size > 0
    #   next_pos = @fetch_queue.bsearch{|req| req.time > request.time}
    #   @fetch_queue.insert(next_pos, request)
    # else
    #   @fetch_queue.push(request)
    # end
    @fetch_queue.push(request)
    @fetch_queue.sort!
  end
rescue => e
  log.error "unknown log encountered", :error_class => e.class, :message => e.message
end

#setup_input(conf) ⇒ Object

<fetch>

method event
target QUERY_NAME
interval 5s
tag    query_name
# tag    field FIELDNAME
# tag    string FIXED_STRING
tag_prefix norikra.event     # actual tag: norikra.event.QUERYNAME

</fetch> <fetch>

method sweep
target QUERY_GROUP # or unspecified => default
interval 60s
tag field group_by_key
tag_prefix norikra.query

</fetch>



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/norikra/input.rb', line 22

def setup_input(conf)
  @fetch_queue = []

  conf.elements.each do |e|
    next unless e.name == 'fetch'
    method = e['method']
    target = e['target']
    interval_str = e['interval']
    tag = e['tag']
    unless method && interval_str && tag
      raise Fluent::ConfigError, "<fetch> must be specified with method/interval/tag"
    end
    if method == 'event' and target.nil?
      raise Fluent::ConfigError, "<fetch> method 'event' requires 'target' for fetch target query name"
    end

    interval = Fluent::Config.time_value(interval_str)
    tag_type, tag_arg = tag.split(/ /, 2)
    req = FetchRequest.new(method, target, interval, tag_type, tag_arg, e['tag_prefix'])

    @fetch_queue << req
  end

  @fetch_queue_mutex = Mutex.new
end

#shutdown_inputObject



57
58
59
60
# File 'lib/fluent/plugin/norikra/input.rb', line 57

def shutdown_input
  # @fetch_thread.kill
  @fetch_thread.join
end

#start_inputObject



48
49
50
51
# File 'lib/fluent/plugin/norikra/input.rb', line 48

def start_input
  @fetch_worker_running = true
  @fetch_thread = Thread.new(&method(:fetch_worker))
end

#stop_inputObject



53
54
55
# File 'lib/fluent/plugin/norikra/input.rb', line 53

def stop_input
  @fetch_worker_running = false
end