Class: RosettaQueue::Gateway::StompAdapter

Inherits:
BaseAdapter
  • Object
show all
Defined in:
lib/rosetta_queue/adapters/stomp.rb

Instance Method Summary collapse

Constructor Details

#initialize(adapter_settings = {}) ⇒ StompAdapter

Returns a new instance of StompAdapter.

Raises:



13
14
15
16
17
18
19
20
# File 'lib/rosetta_queue/adapters/stomp.rb', line 13

def initialize(adapter_settings = {})
  raise AdapterException, "Missing adapter settings" if adapter_settings.empty?
  @conn = Stomp::Connection.open(adapter_settings[:user],
                                 adapter_settings[:password],
                                 adapter_settings[:host],
                                 adapter_settings[:port],
                                 true)
end

Instance Method Details

#ack(msg) ⇒ Object

Raises:



8
9
10
11
# File 'lib/rosetta_queue/adapters/stomp.rb', line 8

def ack(msg)
  raise AdapterException, "Unable to ack client because message-id is blank.  Are your message handler options correct? (i.e., :ack => 'client')" if msg.headers["message-id"].nil?
  @conn.ack(msg.headers["message-id"])
end

#disconnectObject



22
23
24
25
# File 'lib/rosetta_queue/adapters/stomp.rb', line 22

def disconnect
  unsubscribe if @destination
  @conn.disconnect
end

#receive(options) ⇒ Object



27
28
29
30
31
# File 'lib/rosetta_queue/adapters/stomp.rb', line 27

def receive(options)
  msg = @conn.receive
  ack(msg) unless options[:ack].nil?
  msg
end

#receive_once(destination, opts) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/rosetta_queue/adapters/stomp.rb', line 33

def receive_once(destination, opts)
  @destination, @options = destination, opts
  subscribe
  msg = receive(@options).body
  unsubscribe
  RosettaQueue.logger.info("Receiving from #{@destination} :: #{msg}")
  filter_receiving(msg)
end

#receive_with(message_handler) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/rosetta_queue/adapters/stomp.rb', line 42

def receive_with(message_handler)
  @destination, @options = destination_for(message_handler), options_for(message_handler)
  subscribe
  running do
    msg = receive(@options).body
    Thread.current[:processing] = true
    RosettaQueue.logger.info("Receiving from #{@destination} :: #{msg}")
    message_handler.handle_message(msg)
    Thread.current[:processing] = false
  end
end

#send_message(destination, message, options) ⇒ Object



54
55
56
57
58
# File 'lib/rosetta_queue/adapters/stomp.rb', line 54

def send_message(destination, message, options)
  @destination = destination
  RosettaQueue.logger.info("Publishing to #{@destination} :: #{message}")
  @conn.send(@destination, message, options)
end

#subscribeObject



60
61
62
# File 'lib/rosetta_queue/adapters/stomp.rb', line 60

def subscribe
  @conn.subscribe(@destination, @options)
end

#unsubscribeObject



64
65
66
# File 'lib/rosetta_queue/adapters/stomp.rb', line 64

def unsubscribe
  @conn.unsubscribe(@destination)
end