Class: LogStash::Inputs::LogService

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/logservice.rb

Constant Summary collapse

Processor =
com.aliyun.log.logstash

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ LogService

Returns a new instance of LogService.



17
18
19
# File 'lib/logstash/inputs/logservice.rb', line 17

def initialize(*args)
  super(*args)
end

Instance Attribute Details

#workerObject

Returns the value of attribute worker.



16
17
18
# File 'lib/logstash/inputs/logservice.rb', line 16

def worker
  @worker
end

Instance Method Details

#consume(queue) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/logstash/inputs/logservice.rb', line 64

def consume(queue)
       while !stop?
           while !@blockingQueue.isEmpty
               begin
                   msg = @blockingQueue.poll
                   event = LogStash::Event.new(msg)
                   decorate(event)
                   queue << event
               rescue Exception => e
                   @logger.error("Consume logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
                               :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
                               :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e)
                   retry
               end
           end
        Stud.stoppable_sleep(@checkpoint_second) { stop? }
      end # loop

end

#registerObject



38
39
40
41
42
# File 'lib/logstash/inputs/logservice.rb', line 38

def register
  @logger.info("Init logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
      :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
      :checkpoint_second => @checkpoint_second, :include_meta => @include_meta ,:consumer_name_with_ip => @consumer_name_with_ip)
end

#run(queue) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/logstash/inputs/logservice.rb', line 44

def run(queue)
  @local_address = java.net.InetAddress.getLocalHost().getHostAddress();
  @ip_suffix = ''
  if @consumer_name_with_ip
      @ip_suffix = '_' + @local_address
  end
  @process_pid = "_#{Process.pid}"
  @logger.info("Running logstash-input-logservice",:local_address => @local_address)
  @blockingQueue = java.util.concurrent.LinkedBlockingQueue.new(1000)
  LogHubStarter.startWorker(@endpoint, @access_id, @access_key, @project, @logstore, @consumer_group, @consumer_name + @ip_suffix + @process_pid, @position, @checkpoint_second, @include_meta, @blockingQueue)
 
  consume(queue)

  rescue Exception => e
      @logger.error("Start logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
          :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
          :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e)

end

#stopObject



84
85
86
87
88
89
90
# File 'lib/logstash/inputs/logservice.rb', line 84

def stop
  # nothing to do in this case so it is not necessary to define stop
  # examples of common "stop" tasks:
  #  * close sockets (unblocking blocking reads/accepts)
  #  * cleanup temporary files
  #  * terminate spawned threads
end

#teardownObject



92
93
94
95
# File 'lib/logstash/inputs/logservice.rb', line 92

def teardown
  @interrupted = true
  finished
end