Module: Fluent::NorikraPlugin::InputMixin
- Included in:
- Fluent::NorikraFilterOutput, Fluent::NorikraInput
- Defined in:
- lib/fluent/plugin/norikra/input.rb
Instance Method Summary collapse
- #fetch_worker ⇒ Object
- #insert_fetch_queue(request) ⇒ Object
-
#setup_input(conf) ⇒ Object
<fetch> method event target QUERY_NAME interval 5s tag query_name # tag field FIELDNAME # tag string FIXED_STRING tag_prefix norikra.event # actual tag: norikra.event.QUERYNAME </fetch> <fetch> method sweep target QUERY_GROUP # or unspecified => default interval 60s tag field group_by_key tag_prefix norikra.query </fetch>.
- #shutdown_input ⇒ Object
- #start_input ⇒ Object
- #stop_input ⇒ Object
Instance Method Details
#fetch_worker ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/norikra/input.rb', line 78 def fetch_worker while sleep(1) break unless @fetch_worker_running next unless fetchable? next if @fetch_queue.first.nil? || @fetch_queue.first.time > Time.now now = Time.now while @fetch_queue.first.time <= now req = @fetch_queue.shift begin data = req.fetch(client()) rescue => e log.error "failed to fetch", :norikra => "#{@host}:#{@port}", :method => req.method, :target => req.target, :error => e.class, :message => e. end if data data.each do |tag, event_array| next unless event_array event_array.each do |time,event| begin Fluent::Engine.emit(tag, time, event) rescue => e log.error "failed to emit event from norikra query", :norikra => "#{@host}:#{@port}", :error => e.class, :message => e., :tag => tag, :record => event end end end end insert_fetch_queue(req) end end end |
#insert_fetch_queue(request) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/norikra/input.rb', line 62 def insert_fetch_queue(request) @fetch_queue_mutex.synchronize do request.next! if request.time < Time.now # if @fetch_queue.size > 0 # next_pos = @fetch_queue.bsearch{|req| req.time > request.time} # @fetch_queue.insert(next_pos, request) # else # @fetch_queue.push(request) # end @fetch_queue.push(request) @fetch_queue.sort! end rescue => e log.error "unknown log encountered", :error_class => e.class, :message => e. end |
#setup_input(conf) ⇒ Object
<fetch>
method event
target QUERY_NAME
interval 5s
tag query_name
# tag field FIELDNAME
# tag string FIXED_STRING
tag_prefix norikra.event # actual tag: norikra.event.QUERYNAME
</fetch> <fetch>
method sweep
target QUERY_GROUP # or unspecified => default
interval 60s
tag field group_by_key
tag_prefix norikra.query
</fetch>
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/norikra/input.rb', line 22 def setup_input(conf) @fetch_queue = [] conf.elements.each do |e| next unless e.name == 'fetch' method = e['method'] target = e['target'] interval_str = e['interval'] tag = e['tag'] unless method && interval_str && tag raise Fluent::ConfigError, "<fetch> must be specified with method/interval/tag" end if method == 'event' and target.nil? raise Fluent::ConfigError, "<fetch> method 'event' requires 'target' for fetch target query name" end interval = Fluent::Config.time_value(interval_str) tag_type, tag_arg = tag.split(/ /, 2) req = FetchRequest.new(method, target, interval, tag_type, tag_arg, e['tag_prefix']) @fetch_queue << req end @fetch_queue_mutex = Mutex.new end |
#shutdown_input ⇒ Object
57 58 59 60 |
# File 'lib/fluent/plugin/norikra/input.rb', line 57 def shutdown_input # @fetch_thread.kill @fetch_thread.join end |
#start_input ⇒ Object
48 49 50 51 |
# File 'lib/fluent/plugin/norikra/input.rb', line 48 def start_input @fetch_worker_running = true @fetch_thread = Thread.new(&method(:fetch_worker)) end |
#stop_input ⇒ Object
53 54 55 |
# File 'lib/fluent/plugin/norikra/input.rb', line 53 def stop_input @fetch_worker_running = false end |