Class: LogStash::Inputs::LogService
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::LogService
- Defined in:
- lib/logstash/inputs/logservice.rb
Instance Method Summary collapse
- #consume(queue) ⇒ Object
-
#initialize(*args) ⇒ LogService
constructor
attr_accessor :worker.
- #register ⇒ Object
- #run(queue) ⇒ Object
- #stop ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(*args) ⇒ LogService
attr_accessor :worker
17 18 19 |
# File 'lib/logstash/inputs/logservice.rb', line 17 def initialize(*args) super(*args) end |
Instance Method Details
#consume(queue) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/logstash/inputs/logservice.rb', line 78 def consume(queue) while !stop? while !@blockingQueue.isEmpty begin msg = @blockingQueue.poll event = LogStash::Event.new(msg) decorate(event) queue << event rescue Exception => e @logger.error("Consume logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e) retry end end sleep(0.01) end # loop end |
#register ⇒ Object
48 49 50 51 52 |
# File 'lib/logstash/inputs/logservice.rb', line 48 def register @logger.info("Init logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta ,:consumer_name_with_ip => @consumer_name_with_ip) end |
#run(queue) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/inputs/logservice.rb', line 54 def run(queue) @local_address = java.net.InetAddress.getLocalHost().getHostAddress(); @ip_suffix = '' if @consumer_name_with_ip @ip_suffix = '_' + @local_address end @process_pid = "_#{Process.pid}" @logger.info("Running logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :local_address => @local_address) @blockingQueue = java.util.concurrent.LinkedBlockingQueue.new(1000) @logHubStarter = LogHubStarter.new() @logHubStarter.startWorker(@endpoint, @access_id, @access_key, @project, @logstore, @consumer_group, @consumer_name + @ip_suffix + @process_pid, @position, @checkpoint_second, @include_meta, @blockingQueue, @proxy_host, @proxy_port, @proxy_username, @proxy_password, @proxy_domain, @proxy_workstation, @fetch_interval_millis) consume(queue) rescue Exception => e @logger.error("Start logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e) end |
#stop ⇒ Object
97 98 99 |
# File 'lib/logstash/inputs/logservice.rb', line 97 def stop @logHubStarter.stopWorker() end |
#teardown ⇒ Object
101 102 103 104 |
# File 'lib/logstash/inputs/logservice.rb', line 101 def teardown @interrupted = true finished end |