Class: ActiveMessaging::Adapters::Adapter::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::Adapter::Connection
- Defined in:
- lib/activemessaging/adapters/wmq.rb
Overview
Connection class needed by a13g
Instance Attribute Summary
Attributes inherited from BaseConnection
Instance Method Summary collapse
-
#disconnect(headers = {}) ⇒ Object
Disconnect method needed by a13g No need to disconnect from the queue manager since connection and disconnection occurs inside the send and receive methods headers is never used.
-
#initialize(cfg) ⇒ Connection
constructor
Generic init method needed by a13g.
-
#receive(options = {}) ⇒ Object
Receive method needed by a13g.
-
#received(message, headers = {}) ⇒ Object
called after a message is successfully received and processed.
-
#send(q_name, message_data, headers = {}) ⇒ Object
Send method needed by a13g headers may contains 2 different hashes to gives more control over the sending process :descriptor => … to populate the descriptor of the message :put_options => … to specify the put options for that message.
-
#subscribe(q_name, headers = {}, subId = NIL) ⇒ Object
Subscribe method needed by a13g headers may contains a hash to give more control over the get operation on the queue :get_options => … to specify the get options when receiving messages Warning : get options are set only on the first queue subscription and are common to all the queue’s subscriptions Any other get options passed with subsequent subscribe on an existing queue will be discarded subId is never used.
-
#unreceive(message, headers = {}) ⇒ Object
called after a message is successfully received but unsuccessfully processed purpose is to return the message to the destination so receiving and processing and be attempted again.
-
#unsubscribe(q_name, headers = {}, subId = NIL) ⇒ Object
Unsubscribe method needed by a13g Stop listening the queue only after the last unsubscription headers is never used subId is never used.
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
Generic init method needed by a13g
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/activemessaging/adapters/wmq.rb', line 33 def initialize(cfg) # Set default values cfg[:poll_interval] ||= 0.1 # Initialize instance members # Trick for the connection_options is to allow settings WMQ constants directly in broker.yml :)) @connection_options = cfg.each_pair {|key, value| cfg[key] = instance_eval(value) if (value.instance_of?(String) && value.match("WMQ::")) } @queue_names = [] @current_queue = 0 @queues = {} end |
Instance Method Details
#disconnect(headers = {}) ⇒ Object
Disconnect method needed by a13g No need to disconnect from the queue manager since connection and disconnection occurs inside the send and receive methods headers is never used
48 49 |
# File 'lib/activemessaging/adapters/wmq.rb', line 48 def disconnect(headers = {}) end |
#receive(options = {}) ⇒ Object
Receive method needed by a13g
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/activemessaging/adapters/wmq.rb', line 52 def receive(={}) raise "No subscription to receive messages from" if (@queue_names.nil? || @queue_names.empty?) start = @current_queue while true @current_queue = ((@current_queue < @queue_names.length-1) ? @current_queue + 1 : 0) sleep(@connection_options[:poll_interval]) if (@current_queue == start) q = @queues[@queue_names[@current_queue]] unless q.nil? = (q) return unless .nil? end end end |
#received(message, headers = {}) ⇒ Object
called after a message is successfully received and processed
117 118 |
# File 'lib/activemessaging/adapters/wmq.rb', line 117 def received , headers={} end |
#send(q_name, message_data, headers = {}) ⇒ Object
Send method needed by a13g headers may contains 2 different hashes to gives more control over the sending process
:descriptor => {...} to populate the descriptor of the message
:put_options => {...} to specify the put options for that message
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/activemessaging/adapters/wmq.rb', line 70 def send(q_name, , headers={}) WMQ::QueueManager.connect(@connection_options) do |qmgr| qmgr.open_queue(:q_name => q_name, :mode => :output) do |queue| = headers[:descriptor] || {:format => WMQ::MQFMT_STRING} = headers[:put_options].nil? ? {} : headers[:put_options].dup = WMQ::Message.new(:data => , :descriptor => ) queue.put(.merge(:message => , :data => nil)) return Message.new(, q_name) end end end |
#subscribe(q_name, headers = {}, subId = NIL) ⇒ Object
Subscribe method needed by a13g headers may contains a hash to give more control over the get operation on the queue
:get_options => {...} to specify the get options when receiving messages
Warning : get options are set only on the first queue subscription and are common to all the queue's subscriptions
Any other get options passed with subsequent subscribe on an existing queue will be discarded
subId is never used
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/activemessaging/adapters/wmq.rb', line 90 def subscribe(q_name, headers={}, subId=NIL) if @queues[q_name].nil? = headers[:get_options] || {} q = Queue.new(q_name, ) @queues[q_name] = q @queue_names << q.name end q.add_subscription end |
#unreceive(message, headers = {}) ⇒ Object
called after a message is successfully received but unsuccessfully processed purpose is to return the message to the destination so receiving and processing and be attempted again
122 123 |
# File 'lib/activemessaging/adapters/wmq.rb', line 122 def unreceive , headers={} end |
#unsubscribe(q_name, headers = {}, subId = NIL) ⇒ Object
Unsubscribe method needed by a13g Stop listening the queue only after the last unsubscription headers is never used subId is never used
105 106 107 108 109 110 111 112 113 114 |
# File 'lib/activemessaging/adapters/wmq.rb', line 105 def unsubscribe(q_name, headers={}, subId=NIL) q = @queues[q_name] unless q.nil? q.remove_subscription unless q.has_subscription? @queues.delete(q_name) @queue_names.delete(q_name) end end end |