Class: LogStash::Stomp::Handler

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
EM::Protocols::Stomp
Defined in:
lib/logstash/stomp/handler.rb

Direct Known Subclasses

Inputs::Stomp::InputHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Handler

Returns a new instance of Handler.

[View source]

14
15
16
17
18
19
20
21
22
# File 'lib/logstash/stomp/handler.rb', line 14

def initialize(*args)
  super

  @input = args[0]
  @logger = args[1]
  @url = args[2]
  @should_subscribe = true
  @ready = false
end

Instance Attribute Details

#readyObject

Returns the value of attribute ready.


11
12
13
# File 'lib/logstash/stomp/handler.rb', line 11

def ready
  @ready
end

#should_subscribeObject

Returns the value of attribute should_subscribe.


10
11
12
# File 'lib/logstash/stomp/handler.rb', line 10

def should_subscribe
  @should_subscribe
end

Instance Method Details

#connection_completedObject

[View source]

25
26
27
28
29
# File 'lib/logstash/stomp/handler.rb', line 25

def connection_completed
  @logger.debug("Connected")
  connect :login => @url.user, :passcode => @url.password
  @ready = true
end

#receive_msg(message) ⇒ Object

[View source]

47
48
49
50
51
52
53
54
55
56
57
# File 'lib/logstash/stomp/handler.rb', line 47

def receive_msg(message)
  @logger.debug(["receiving message", { :msg => message }])
  if message.command == "CONNECTED"
    if @should_subscribe
      @logger.debug(["subscribing to", { :path => @url.path }])
      subscribe @url.path
      return
    end
    @ready = true
  end
end

#unbindObject

[View source]

32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/logstash/stomp/handler.rb', line 32

def unbind
  if $EVENTMACHINE_STOPPING
    @logger.debug(["Connection to stomp broker died (probably since we are exiting)",
                  { :url => @url }])
    return
  end
                
  @logger.error(["Connection to stomp broker died, retrying.", { :url => @url }])
  @ready = false
  EventMachine::Timer.new(1) do
    reconnect(@url.host, @url.port)
  end
end