Class: Simrpc::QpidAdapter::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/simrpc/qpid_adapter.rb

Overview

Simrpc::Qpid::Node class, represents an enpoint on a qpid network which has its own exchange and queue which it listens on

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Node

create the qpid base connection with the specified broker / port or config file. Then establish exchange and queue and start listening for requests.

specify :broker and :port arguments to directly connect to those specify :config argument to use that yml file specify MOTEL_AMQP_CONF environment variable to use that yml file specify :id parameter to set id, else it will be set to a uuid just created



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/simrpc/qpid_adapter.rb', line 62

def initialize(args = {})
   # if no id specified generate a new uuid
   @node_id = args[:id].nil? ? gen_uuid : args[:id]

   # we generate a random session id
   @session_id = gen_uuid

   # get the broker/port
   broker = args[:broker].nil? ? "localhost"  : args[:broker]
   port   = args[:port].nil? ? 5672  : args[:port]

   if (broker.nil? || port.nil?) && args.has_key?(:config)
     config      =
     amqpconfig = YAML::load(File.open(args[:config]))
     broker = amqpconfig["broker"] if broker.nil?
     port   = amqpconfig["port"]   if port.nil?
   end

   ### create underlying tcp connection
   # NOTE pretty big bug in ruby/qpid preventing us from using 'inspect'
   # on this class (causes segfault)
   # https://issues.apache.org/jira/browse/QPID-2405
   @conn = Qpid::Connection.new(TCPSocket.new(broker,port))
   @conn.start

   ### connect to qpid broker
   @ssn = @conn.session(@session_id)

   @children = {}

   # threads pool to handle incoming requests
   # FIXME make the # of threads and timeout configurable)
   @thread_pool = ThreadPool.new(10, :timeout => 5)

   @accept_lock = Semaphore.new(1)

   # qpid constructs that will be created for node
   @exchange     = args[:exchange].nil?    ? @node_id.to_s + "-exchange"    : args[:exchange]
   @queue        = args[:queue].nil?       ? @node_id.to_s + "-queue"       : args[:queue]
   @local_queue  = args[:local_queue].nil? ? @node_id.to_s + "-local-queue" : args[:local_queue]
   @routing_key  = @queue

   Logger.warn "creating qpid exchange #{@exchange} queue #{@queue} binding_key #{@routing_key}"

   if @ssn.exchange_query(@exchange).not_found
     @ssn.exchange_declare(@exchange, :type => "direct")
   end

   if @ssn.queue_query(@queue).queue.nil?
     @ssn.queue_declare(@queue)
   end

   @ssn.exchange_bind(:exchange => @exchange,
                      :queue    => @queue,
                      :binding_key => @routing_key)
end

Instance Attribute Details

#childrenObject

a node can have children nodes mapped to by keys



49
50
51
# File 'lib/simrpc/qpid_adapter.rb', line 49

def children
  @children
end

#node_idObject (readonly)

node always has a node id



52
53
54
# File 'lib/simrpc/qpid_adapter.rb', line 52

def node_id
  @node_id
end

Instance Method Details

#async_accept(&handler) ⇒ Object

Instruct Node to start accepting requests asynchronously and immediately return. handler must be callable and take node, msg, respond_to arguments, corresponding to ‘self’, the message received’, and the routing_key which to send any response.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/simrpc/qpid_adapter.rb', line 122

def async_accept(&handler)
   # TODO permit a QpidNode to accept messages from multiple exchanges/queues
   @accept_lock.wait

   # subscribe to the queue
   @ssn.message_subscribe(:destination => @local_queue,
                          :queue => @queue,
                          :accept_mode => @ssn.message_accept_mode.none)
   @incoming = @ssn.incoming(@local_queue)
   @incoming.start

   Logger.warn "listening for messages on #{@queue}"

   # start receiving messages
   @incoming.listen{ |msg|
      Logger.info "queue #{@queue} received message #{msg.body}"
      reply_to = msg.get(:message_properties).reply_to.routing_key
      # FIXME should delete handler threads as they complete
      # FIXME handler timeout
      job = ThreadPoolJob.new { handler.call(self, msg.body, reply_to) }
      @thread_pool << job
   }
end

#joinObject

block until accept operation is complete



147
148
149
150
# File 'lib/simrpc/qpid_adapter.rb', line 147

def join
   @accept_lock.wait
   #FIXME @thread_pool.join
end

#send_message(routing_key, message) ⇒ Object

send a message to the specified routing_key



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/simrpc/qpid_adapter.rb', line 168

def send_message(routing_key, message)
  dp = @ssn.delivery_properties(:routing_key => routing_key)
  mp = @ssn.message_properties( :content_type => "text/plain")
  rp = @ssn.message_properties( :reply_to =>
                                @ssn.reply_to(@exchange, @routing_key))
  msg = Qpid::Message.new(dp, mp, rp, message.to_s)

  Logger.warn "sending qpid message #{msg.body} to #{routing_key}"

  # send it
  @ssn.message_transfer(:message => msg)
end

#terminateObject

instructs QpidServer to stop accepting, blocking untill all accepting operations have terminated



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/simrpc/qpid_adapter.rb', line 154

def terminate
  Logger.warn "terminating qpid session"
  @thread_pool.stop
  unless @incoming.nil?
    @incoming.stop
    @incoming.close
    @accept_lock.signal
  end
  @ssn.queue_delete(@queue)
  @ssn.exchange_delete(@exchange)
  @ssn.close
end