Class: LogStash::Inputs::Azuretopic

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azuretopic.rb

Overview

Reads events from Azure topics

Defined Under Namespace

Classes: Interrupted

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Azuretopic

Returns a new instance of Azuretopic.



23
24
25
# File 'lib/logstash/inputs/azuretopic.rb', line 23

def initialize(*args)
super(*args)
end

Instance Method Details

#process(output_queue) ⇒ Object

def register



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/logstash/inputs/azuretopic.rb', line 46

def process(output_queue)
  message = @azure_service_bus.receive_subscription_message(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } )
  if message
    codec.decode(message.body) do |event|
      decorate(event)
      output_queue << event
    end # codec.decode
    @azure_service_bus.delete_subscription_message(message)
  end
  rescue LogStash::ShutdownSignal => e
    raise e
  rescue => e
    @logger.error("Oh My, An error occurred.", :exception => e)
  if message and message.delivery_count > @deliverycount
    @azure_service_bus.delete_subscription_message(message)
  end
end

#registerObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/logstash/inputs/azuretopic.rb', line 28

def register
  Azure.configure do |config|
    config.sb_namespace = @namespace
    config.sb_access_key = @access_key
    config.sb_sas_key_name = @access_key_name
    config.sb_sas_key = @access_key
  end
  if access_key_name 
      # SAS key used 
      signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
      sb_host = "https://#{Azure.sb_namespace}.servicebus.windows.net"
      @azure_service_bus = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
  else
      # ACS key 
      @azure_service_bus = Azure::ServiceBus::ServiceBusService.new
  end
end

#run(output_queue) ⇒ Object



65
66
67
68
69
# File 'lib/logstash/inputs/azuretopic.rb', line 65

def run(output_queue)
  while !stop?
    process(output_queue)
  end # loop
end

#teardownObject



72
73
# File 'lib/logstash/inputs/azuretopic.rb', line 72

def teardown
end