Class: Simrpc::QpidAdapter::Node
- Inherits:
-
Object
- Object
- Simrpc::QpidAdapter::Node
- Defined in:
- lib/simrpc/qpid_adapter.rb
Overview
Simrpc::Qpid::Node class, represents an enpoint on a qpid network which has its own exchange and queue which it listens on
Instance Attribute Summary collapse
-
#children ⇒ Object
a node can have children nodes mapped to by keys.
-
#node_id ⇒ Object
readonly
node always has a node id.
Instance Method Summary collapse
-
#async_accept(&handler) ⇒ Object
Instruct Node to start accepting requests asynchronously and immediately return.
-
#initialize(args = {}) ⇒ Node
constructor
create the qpid base connection with the specified broker / port or config file.
-
#join ⇒ Object
block until accept operation is complete.
-
#send_message(routing_key, message) ⇒ Object
send a message to the specified routing_key.
-
#terminate ⇒ Object
instructs QpidServer to stop accepting, blocking untill all accepting operations have terminated.
Constructor Details
#initialize(args = {}) ⇒ Node
create the qpid base connection with the specified broker / port or config file. Then establish exchange and queue and start listening for requests.
specify :broker and :port arguments to directly connect to those specify :config argument to use that yml file specify MOTEL_AMQP_CONF environment variable to use that yml file specify :id parameter to set id, else it will be set to a uuid just created
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/simrpc/qpid_adapter.rb', line 62 def initialize(args = {}) # if no id specified generate a new uuid @node_id = args[:id].nil? ? gen_uuid : args[:id] # we generate a random session id @session_id = gen_uuid # get the broker/port broker = args[:broker].nil? ? "localhost" : args[:broker] port = args[:port].nil? ? 5672 : args[:port] if (broker.nil? || port.nil?) && args.has_key?(:config) config = amqpconfig = YAML::load(File.open(args[:config])) broker = amqpconfig["broker"] if broker.nil? port = amqpconfig["port"] if port.nil? end ### create underlying tcp connection # NOTE pretty big bug in ruby/qpid preventing us from using 'inspect' # on this class (causes segfault) # https://issues.apache.org/jira/browse/QPID-2405 @conn = Qpid::Connection.new(TCPSocket.new(broker,port)) @conn.start ### connect to qpid broker @ssn = @conn.session(@session_id) @children = {} # threads pool to handle incoming requests # FIXME make the # of threads and timeout configurable) @thread_pool = ThreadPool.new(10, :timeout => 5) @accept_lock = Semaphore.new(1) # qpid constructs that will be created for node @exchange = args[:exchange].nil? ? @node_id.to_s + "-exchange" : args[:exchange] @queue = args[:queue].nil? ? @node_id.to_s + "-queue" : args[:queue] @local_queue = args[:local_queue].nil? ? @node_id.to_s + "-local-queue" : args[:local_queue] @routing_key = @queue Logger.warn "creating qpid exchange #{@exchange} queue #{@queue} binding_key #{@routing_key}" if @ssn.exchange_query(@exchange).not_found @ssn.exchange_declare(@exchange, :type => "direct") end if @ssn.queue_query(@queue).queue.nil? @ssn.queue_declare(@queue) end @ssn.exchange_bind(:exchange => @exchange, :queue => @queue, :binding_key => @routing_key) end |
Instance Attribute Details
#children ⇒ Object
a node can have children nodes mapped to by keys
49 50 51 |
# File 'lib/simrpc/qpid_adapter.rb', line 49 def children @children end |
#node_id ⇒ Object (readonly)
node always has a node id
52 53 54 |
# File 'lib/simrpc/qpid_adapter.rb', line 52 def node_id @node_id end |
Instance Method Details
#async_accept(&handler) ⇒ Object
Instruct Node to start accepting requests asynchronously and immediately return. handler must be callable and take node, msg, respond_to arguments, corresponding to ‘self’, the message received’, and the routing_key which to send any response.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/simrpc/qpid_adapter.rb', line 122 def async_accept(&handler) # TODO permit a QpidNode to accept messages from multiple exchanges/queues @accept_lock.wait # subscribe to the queue @ssn.(:destination => @local_queue, :queue => @queue, :accept_mode => @ssn..none) @incoming = @ssn.incoming(@local_queue) @incoming.start Logger.warn "listening for messages on #{@queue}" # start receiving messages @incoming.listen{ |msg| Logger.info "queue #{@queue} received message #{msg.body}" reply_to = msg.get(:message_properties).reply_to.routing_key # FIXME should delete handler threads as they complete # FIXME handler timeout job = ThreadPoolJob.new { handler.call(self, msg.body, reply_to) } @thread_pool << job } end |
#join ⇒ Object
block until accept operation is complete
147 148 149 150 |
# File 'lib/simrpc/qpid_adapter.rb', line 147 def join @accept_lock.wait #FIXME @thread_pool.join end |
#send_message(routing_key, message) ⇒ Object
send a message to the specified routing_key
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/simrpc/qpid_adapter.rb', line 168 def (routing_key, ) dp = @ssn.delivery_properties(:routing_key => routing_key) mp = @ssn.( :content_type => "text/plain") rp = @ssn.( :reply_to => @ssn.reply_to(@exchange, @routing_key)) msg = Qpid::Message.new(dp, mp, rp, .to_s) Logger.warn "sending qpid message #{msg.body} to #{routing_key}" # send it @ssn.(:message => msg) end |
#terminate ⇒ Object
instructs QpidServer to stop accepting, blocking untill all accepting operations have terminated
154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/simrpc/qpid_adapter.rb', line 154 def terminate Logger.warn "terminating qpid session" @thread_pool.stop unless @incoming.nil? @incoming.stop @incoming.close @accept_lock.signal end @ssn.queue_delete(@queue) @ssn.exchange_delete(@exchange) @ssn.close end |