Class: ActiveMessaging::Adapters::ReliableMsgConnection

Inherits:
BaseConnection
  • Object
show all
Defined in:
lib/activemessaging/adapters/reliable_msg.rb

Constant Summary collapse

THREAD_OLD_TXS =
:a13g_reliable_msg_old_txs
QUEUE_PARAMS =
[:expires,:delivery,:priority,:max_deliveries,:drb_uri,:tx_timeout,:connect_count]
TOPIC_PARAMS =
[:expires,:drb_uri,:tx_timeout,:connect_count]

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ ReliableMsgConnection

generic init method needed by a13g



33
34
35
36
37
38
39
40
41
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 33

def initialize cfg
  @poll_interval = cfg[:poll_interval]  || 1
  @reliable = cfg[:reliable]            || true
  @tx_timeout = cfg[:tx_timeout]        || ::ReliableMsg::Client::DEFAULT_TX_TIMEOUT

  @subscriptions = {}
  @destinations = {}
  @current_subscription = 0
end

Instance Attribute Details

#current_subscriptionObject

configurable params



30
31
32
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 30

def current_subscription
  @current_subscription
end

#destinationsObject

configurable params



30
31
32
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 30

def destinations
  @destinations
end

#poll_intervalObject

configurable params



30
31
32
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 30

def poll_interval
  @poll_interval
end

#subscriptionsObject

configurable params



30
31
32
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 30

def subscriptions
  @subscriptions
end

#tx_timeoutObject

configurable params



30
31
32
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 30

def tx_timeout
  @tx_timeout
end

Instance Method Details

#disconnectObject

called to cleanly get rid of connection



44
45
46
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 44

def disconnect
  nil
end

#get_or_create_destination(destination_name, message_headers = {}) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 80

def get_or_create_destination destination_name, message_headers={}
  return destinations[destination_name] if destinations.has_key? destination_name
  dd = /^\/(queue|topic)\/(.*)$/.match(destination_name)
  rm_class = dd[1].titleize
  message_headers.delete("id")
  dest_headers = message_headers.reject {|k,v| rm_class == 'Queue' ? !QUEUE_PARAMS.include?(k) : !TOPIC_PARAMS.include?(k)}
  rm_dest = "ReliableMsg::#{rm_class}".constantize.new(dd[2], dest_headers)
  destinations[destination_name] = rm_dest
end

#receive(options = {}) ⇒ Object

receive a single message from any of the subscribed destinations check each destination once, then sleep for poll_interval



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 92

def receive(options={})

  raise "No subscriptions to receive messages from." if (subscriptions.nil? || subscriptions.empty?)
  start = current_subscription
  while true
    self.current_subscription = ((current_subscription < subscriptions.length-1) ? current_subscription + 1 : 0)
    sleep poll_interval if (current_subscription == start)
    destination_name = subscriptions.keys.sort[current_subscription]
    destination = destinations[destination_name]
    unless destination.nil?
      # from the way we use this, assume this is the start of a transaction, 
      # there should be no current transaction
      ctx = Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX]
      raise "There should not be an existing reliable-msg transaction. #{ctx.inspect}" if ctx

      # start a new transaction
      @tx = {:qm=>destination.queue_manager}
      @tx[:tid] = @tx[:qm].begin @tx_timeout
      Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = @tx
      begin

        # now call a get on the destination - it will use the transaction
        #the commit or the abort will occur in the received or unreceive methods
        reliable_msg = destination.get subscriptions[destination_name].headers[:selector]
        @tx[:qm].commit(@tx[:tid]) if reliable_msg.nil?

      rescue Object=>err
        #abort the transaction on error
        @tx[:qm].abort(@tx[:tid])

        raise err unless reliable
        puts "receive failed, will retry in #{@poll_interval} seconds"
        sleep poll_interval
      end
      return Message.new(reliable_msg.object, reliable_msg.id, reliable_msg.headers, destination_name, @tx) if reliable_msg
      
      Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil
    end
  end
end

#received(message, headers = {}) ⇒ Object

called after a message is successfully received and processed



134
135
136
137
138
139
140
141
142
143
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 134

def received message, headers={}
  begin
    message.transaction[:qm].commit(message.transaction[:tid]) 
  rescue Object=>ex
    puts "received failed: #{ex.message}"
  ensure
    Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil
  end
  
end

#send(destination_name, message_body, message_headers = {}) ⇒ Object

destination_name string, body string, headers hash send a single message to a destination



69
70
71
72
73
74
75
76
77
78
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 69

def send destination_name, message_body, message_headers={}
  dest = get_or_create_destination(destination_name)
  begin
    dest.put message_body, message_headers
  rescue Object=>err
    raise err unless reliable
    puts "send failed, will retry in #{@poll_interval} seconds"
    sleep @poll_interval
  end
end

#subscribe(destination_name, message_headers = {}) ⇒ Object

destination_name string, headers hash subscribe to listen on a destination use ‘/destination-type/name’ convetion, like stomp



51
52
53
54
55
56
57
58
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 51

def subscribe destination_name, message_headers={}
  get_or_create_destination(destination_name, message_headers)
  if subscriptions.has_key? destination_name
    subscriptions[destination_name].add
  else
    subscriptions[destination_name] = Subscription.new(destination_name, message_headers)
  end
end

#unreceive(message, headers = {}) ⇒ Object

called after a message is successfully received and processed



146
147
148
149
150
151
152
153
154
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 146

def unreceive message, headers={}
  begin
    message.transaction[:qm].abort(message.transaction[:tid])
  rescue Object=>ex
    puts "unreceive failed: #{ex.message}"
  ensure
    Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil
  end
end

#unsubscribe(destination_name, message_headers = {}) ⇒ Object

destination_name string, headers hash unsubscribe to listen on a destination



62
63
64
65
# File 'lib/activemessaging/adapters/reliable_msg.rb', line 62

def unsubscribe destination_name, message_headers={}
  subscriptions[destination_name].remove
  subscriptions.delete(destination_name) if subscriptions[destination_name].count <= 0
end