Class: LogStash::Inputs::Azuretopicthreadable
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Azuretopicthreadable
- Defined in:
- lib/logstash/inputs/azuretopicthreadable.rb
Overview
Reads events from Azure topics
Defined Under Namespace
Classes: Interrupted
Instance Method Summary collapse
-
#initialize(*args) ⇒ Azuretopicthreadable
constructor
A new instance of Azuretopicthreadable.
-
#process(output_queue, pid) ⇒ Object
def register.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(*args) ⇒ Azuretopicthreadable
Returns a new instance of Azuretopicthreadable.
26 27 28 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 26 def initialize(*args) super(*args) end |
Instance Method Details
#process(output_queue, pid) ⇒ Object
def register
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 39 def process(output_queue, pid) # Get a new instance of a service azure_service_bus = Azure::ServiceBus::ServiceBusService.new while true begin # check if we have a message in the subscription = azure_service_bus.(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } ) if # decoding returns a yield codec.decode(.body) do |event| output_queue << event end # codec.decode # delete the message after reading it azure_service_bus.() end rescue LogStash::ShutdownSignal => e raise e rescue => e @logger.error("Oh My, An error occurred. Thread id:" + pid.to_s, :exception => e) if and .delivery_count > @deliverycount azure_service_bus.() end end sleep(@thread_sleep_time) end end |
#register ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 31 def register # Configure credentials Azure.configure do |config| config.sb_namespace = @namespace config.sb_access_key = @access_key end end |
#run(output_queue) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 67 def run(output_queue) threads = [] (0..(@threads-1)).each do |pid| threads << Thread.new { process(output_queue, pid) } end threads.each { |thr| thr.join } end |
#teardown ⇒ Object
76 77 |
# File 'lib/logstash/inputs/azuretopicthreadable.rb', line 76 def teardown end |