Class: LogStash::Inputs::LogService
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::LogService
- Defined in:
- lib/logstash/inputs/logservice.rb
Constant Summary collapse
- Processor =
com.aliyun.log.logstash
Instance Attribute Summary collapse
-
#worker ⇒ Object
Returns the value of attribute worker.
Instance Method Summary collapse
- #consume(queue) ⇒ Object
-
#initialize(*args) ⇒ LogService
constructor
A new instance of LogService.
- #register ⇒ Object
- #run(queue) ⇒ Object
- #stop ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(*args) ⇒ LogService
Returns a new instance of LogService.
17 18 19 |
# File 'lib/logstash/inputs/logservice.rb', line 17 def initialize(*args) super(*args) end |
Instance Attribute Details
#worker ⇒ Object
Returns the value of attribute worker.
16 17 18 |
# File 'lib/logstash/inputs/logservice.rb', line 16 def worker @worker end |
Instance Method Details
#consume(queue) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/logstash/inputs/logservice.rb', line 64 def consume(queue) while !stop? while !@blockingQueue.isEmpty begin msg = @blockingQueue.poll event = LogStash::Event.new(msg) decorate(event) queue << event rescue Exception => e @logger.error("Consume logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => , :consumer_name_with_ip => @consumer_name_with_ip, :exception => e) retry end end Stud.stoppable_sleep(@checkpoint_second) { stop? } end # loop end |
#register ⇒ Object
38 39 40 41 42 |
# File 'lib/logstash/inputs/logservice.rb', line 38 def register @logger.info("Init logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => ,:consumer_name_with_ip => @consumer_name_with_ip) end |
#run(queue) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/logstash/inputs/logservice.rb', line 44 def run(queue) @local_address = java.net.InetAddress.getLocalHost().getHostAddress(); @ip_suffix = '' if @consumer_name_with_ip @ip_suffix = '_' + @local_address end @process_pid = "_#{Process.pid}" @logger.info("Running logstash-input-logservice",:local_address => @local_address) @blockingQueue = java.util.concurrent.LinkedBlockingQueue.new(1000) LogHubStarter.startWorker(@endpoint, @access_id, @access_key, @project, @logstore, @consumer_group, @consumer_name + @ip_suffix + @process_pid, @position, @checkpoint_second, , @blockingQueue) consume(queue) rescue Exception => e @logger.error("Start logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => , :consumer_name_with_ip => @consumer_name_with_ip, :exception => e) end |
#stop ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/logstash/inputs/logservice.rb', line 84 def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end |
#teardown ⇒ Object
92 93 94 95 |
# File 'lib/logstash/inputs/logservice.rb', line 92 def teardown @interrupted = true finished end |