Class: ASS::RPC
- Inherits:
-
Object
- Object
- ASS::RPC
- Defined in:
- lib/ass/rpc.rb
Defined Under Namespace
Classes: Future
Instance Attribute Summary collapse
-
#buffer ⇒ Object
readonly
Returns the value of attribute buffer.
-
#futures ⇒ Object
readonly
Returns the value of attribute futures.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#ready ⇒ Object
readonly
Returns the value of attribute ready.
Class Method Summary collapse
-
.random_id ⇒ Object
stolen from nanite.
Instance Method Summary collapse
- #call(server_name, method, data = nil, opts = {}, meta = nil) ⇒ Object
-
#initialize(opts = {}) ⇒ RPC
constructor
A new instance of RPC.
- #inspect ⇒ Object
-
#wait(future, timeout = nil) ⇒ Object
the idea is to block on a synchronized queue until we get the future we want.
- #waitall ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ RPC
Returns a new instance of RPC.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ass/rpc.rb', line 57 def initialize(opts={}) raise "can't run rpc client in the same thread as eventmachine" if EM.reactor_thread? self.extend(MonitorMixin) @seq = 0 # queue is used be used to synchronize RPC # user thread and the AMQP eventmachine thread. @buffer = Queue.new @ready = {} # the ready results not yet waited @futures = {} # all futures not yet waited for. # Creates an exclusive queue to serve the RPC client. @rpc_id = ASS::RPC.random_id.to_s buffer = @buffer # closure binding for reactor exchange = ASS.mq.direct("__rpc__") @name = "__rpc__#{@rpc_id}" queue = ASS.mq.queue(@name, :exclusive => true, :auto_delete => true) queue.bind("__rpc__",:routing_key => @rpc_id) queue.subscribe { |header,payload| payload = ::Marshal.load(payload) buffer << [header,payload] } end |
Instance Attribute Details
#buffer ⇒ Object (readonly)
Returns the value of attribute buffer.
56 57 58 |
# File 'lib/ass/rpc.rb', line 56 def buffer @buffer end |
#futures ⇒ Object (readonly)
Returns the value of attribute futures.
56 57 58 |
# File 'lib/ass/rpc.rb', line 56 def futures @futures end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
55 56 57 |
# File 'lib/ass/rpc.rb', line 55 def name @name end |
#ready ⇒ Object (readonly)
Returns the value of attribute ready.
56 57 58 |
# File 'lib/ass/rpc.rb', line 56 def ready @ready end |
Class Method Details
.random_id ⇒ Object
stolen from nanite
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/ass/rpc.rb', line 9 def self.random_id values = [ rand(0x0010000), rand(0x0010000), rand(0x0010000), rand(0x0010000), rand(0x0010000), rand(0x1000000), rand(0x1000000), ] "%04x%04x%04x%04x%04x%06x%06x" % values end |
Instance Method Details
#call(server_name, method, data = nil, opts = {}, meta = nil) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/ass/rpc.rb', line 81 def call(server_name,method,data=nil,opts={},=nil) self.synchronize do = @seq.to_s # message gotta be unique for this RPC client. # by default route message to the exchange @name@, with routing key @name@ ASS.call(server_name, method, data, # can't override these options opts.merge(:message_id => , :reply_to => "__rpc__", :key => @rpc_id), ) @seq += 1 @futures[] = Future.new(self,) end end |
#inspect ⇒ Object
177 178 179 |
# File 'lib/ass/rpc.rb', line 177 def inspect "#<#{self.class} #{self.name}>" end |
#wait(future, timeout = nil) ⇒ Object
the idea is to block on a synchronized queue until we get the future we want.
WARNING: blocks forever if the thread calling wait is the same as the EventMachine thread.
It is safe (btw) to use the RPC client within an ASS server/actor, because the wait is in an EM worker thread, rather than the EM thread itself. The EM thread is still free to process the queue. CAVEAT: you could run out of EM worker threads.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/ass/rpc.rb', line 111 def wait(future,timeout=nil) return future.data if future.done? # future was waited before # we can have more fine grained synchronization later. ## easiest thing to do (later) is use threadsafe hash for @futures and @ready. ### But it's actually trickier than ### that. Before each @buffer.pop, a thread ### has to check again if it sees the result ### in @ready. self.synchronize do timer = nil if timeout timer = EM.add_timer(timeout) { @buffer << [:timeout,future.] } end ready_future = nil if @ready.has_key? future. @ready.delete future. ready_future = future else while true header,payload = @buffer.pop # synchronize. like erlang's mailbox select. if header == :timeout # timeout the future we are waiting for. = payload # if we got a timeout from previous wait. throw it away. next if future. != future.timeout = true future.done! @futures.delete future. return yield # return the value of timeout block end data = payload["data"] some_future = @futures[header.] # If we didn't find the future among the # future, it must have timedout. Just # throw result away and keep processing. next unless some_future some_future.timeout = false some_future.header = header some_future.data = data some_future.method = payload["method"] some_future. = payload["meta"] if some_future == future # The future we are waiting for EM.cancel_timer(timer) if timer ready_future = future break else # Ready, but we are not waiting for it. Save for later. @ready[some_future.] = some_future end end end ready_future.done! @futures.delete ready_future. return ready_future.data end end |
#waitall ⇒ Object
171 172 173 174 175 |
# File 'lib/ass/rpc.rb', line 171 def waitall @futures.values.map { |k,v| wait(v) } end |