Class: LogStash::Inputs::LogService

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/logservice.rb

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ LogService

attr_accessor :worker



17
18
19
# File 'lib/logstash/inputs/logservice.rb', line 17

def initialize(*args)
  super(*args)
end

Instance Method Details

#consume(queue) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/logstash/inputs/logservice.rb', line 78

def consume(queue)
       while !stop?
           while !@blockingQueue.isEmpty
               begin
                   msg = @blockingQueue.poll
                   event = LogStash::Event.new(msg)
                   decorate(event)
                   queue << event
               rescue Exception => e
                   @logger.error("Consume logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
                               :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
                               :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e)
                   retry
               end
           end
           sleep(0.01)
      end # loop

end

#registerObject



48
49
50
51
52
# File 'lib/logstash/inputs/logservice.rb', line 48

def register
  @logger.info("Init logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
      :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
      :checkpoint_second => @checkpoint_second, :include_meta => @include_meta ,:consumer_name_with_ip => @consumer_name_with_ip)
end

#run(queue) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/inputs/logservice.rb', line 54

def run(queue)
  @local_address = java.net.InetAddress.getLocalHost().getHostAddress();
  @ip_suffix = ''
  if @consumer_name_with_ip
      @ip_suffix = '_' + @local_address
  end
  @process_pid = "_#{Process.pid}"
  @logger.info("Running logstash-input-logservice", :endpoint => @endpoint, :project => @project,
  :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name,
  :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta,
  :consumer_name_with_ip => @consumer_name_with_ip, :local_address => @local_address)
  @blockingQueue = java.util.concurrent.LinkedBlockingQueue.new(1000)
  @logHubStarter = LogHubStarter.new()
  @logHubStarter.startWorker(@endpoint, @access_id, @access_key, @project, @logstore, @consumer_group, @consumer_name + @ip_suffix + @process_pid, @position, @checkpoint_second, @include_meta, @blockingQueue, @proxy_host, @proxy_port, @proxy_username, @proxy_password, @proxy_domain, @proxy_workstation, @fetch_interval_millis)

  consume(queue)

  rescue Exception => e
      @logger.error("Start logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
          :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
          :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e)

end

#stopObject



97
98
99
# File 'lib/logstash/inputs/logservice.rb', line 97

def stop
  @logHubStarter.stopWorker()
end

#teardownObject



101
102
103
104
# File 'lib/logstash/inputs/logservice.rb', line 101

def teardown
  @interrupted = true
  finished
end