Class: ActiveMessaging::Adapters::Adapter::Connection

Inherits:
BaseConnection
  • Object
show all
Defined in:
lib/activemessaging/adapters/wmq.rb

Overview

Connection class needed by a13g

Instance Attribute Summary

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

Generic init method needed by a13g



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/activemessaging/adapters/wmq.rb', line 33

def initialize(cfg)
  # Set default values
  cfg[:poll_interval] ||= 0.1

  # Initialize instance members
  # Trick for the connection_options is to allow settings WMQ constants directly in broker.yml :))
  @connection_options = cfg.each_pair {|key, value| cfg[key] = instance_eval(value) if (value.instance_of?(String) && value.match("WMQ::")) }
  @queue_names = []
  @current_queue = 0
  @queues = {}
end

Instance Method Details

#disconnect(headers = {}) ⇒ Object

Disconnect method needed by a13g No need to disconnect from the queue manager since connection and disconnection occurs inside the send and receive methods headers is never used



48
49
# File 'lib/activemessaging/adapters/wmq.rb', line 48

def disconnect(headers = {})
end

#receive(options = {}) ⇒ Object

Receive method needed by a13g



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/activemessaging/adapters/wmq.rb', line 52

def receive(options={})
  raise "No subscription to receive messages from" if (@queue_names.nil? || @queue_names.empty?)
  start = @current_queue
  while true
    @current_queue = ((@current_queue < @queue_names.length-1) ? @current_queue + 1 : 0)
    sleep(@connection_options[:poll_interval]) if (@current_queue == start)
    q = @queues[@queue_names[@current_queue]]
    unless q.nil?
      message = retrieve_message(q)
      return message unless message.nil?
    end
  end
end

#received(message, headers = {}) ⇒ Object

called after a message is successfully received and processed



117
118
# File 'lib/activemessaging/adapters/wmq.rb', line 117

def received message, headers={}
end

#send(q_name, message_data, headers = {}) ⇒ Object

Send method needed by a13g headers may contains 2 different hashes to gives more control over the sending process

:descriptor => {...} to populate the descriptor of the message
:put_options => {...} to specify the put options for that message


70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/activemessaging/adapters/wmq.rb', line 70

def send(q_name, message_data, headers={})
  WMQ::QueueManager.connect(@connection_options) do |qmgr|
    qmgr.open_queue(:q_name => q_name, :mode => :output) do |queue|

      message_descriptor = headers[:descriptor] || {:format => WMQ::MQFMT_STRING}
      put_options = headers[:put_options].nil? ? {} : headers[:put_options].dup

      wmq_message = WMQ::Message.new(:data => message_data, :descriptor => message_descriptor)
      queue.put(put_options.merge(:message => wmq_message, :data => nil))
      return Message.new(wmq_message, q_name)
    end
  end
end

#subscribe(q_name, headers = {}, subId = NIL) ⇒ Object

Subscribe method needed by a13g headers may contains a hash to give more control over the get operation on the queue

:get_options => {...} to specify the get options when receiving messages
Warning : get options are set only on the first queue subscription and are common to all the queue's subscriptions
          Any other get options passed with subsequent subscribe on an existing queue will be discarded

subId is never used



90
91
92
93
94
95
96
97
98
99
# File 'lib/activemessaging/adapters/wmq.rb', line 90

def subscribe(q_name, headers={}, subId=NIL)
  if @queues[q_name].nil?
    get_options = headers[:get_options] || {}
    q = Queue.new(q_name, get_options)
    @queues[q_name] = q
    @queue_names << q.name
  end

  q.add_subscription
end

#unreceive(message, headers = {}) ⇒ Object

called after a message is successfully received but unsuccessfully processed purpose is to return the message to the destination so receiving and processing and be attempted again



122
123
# File 'lib/activemessaging/adapters/wmq.rb', line 122

def unreceive message, headers={}
end

#unsubscribe(q_name, headers = {}, subId = NIL) ⇒ Object

Unsubscribe method needed by a13g Stop listening the queue only after the last unsubscription headers is never used subId is never used



105
106
107
108
109
110
111
112
113
114
# File 'lib/activemessaging/adapters/wmq.rb', line 105

def unsubscribe(q_name, headers={}, subId=NIL)
  q = @queues[q_name]
  unless q.nil?
    q.remove_subscription
    unless q.has_subscription?
      @queues.delete(q_name)
      @queue_names.delete(q_name)
    end
  end
end