Class: QRPC::Server
- Inherits:
-
Object
- Object
- QRPC::Server
- Defined in:
- lib/qrpc/server.rb,
lib/qrpc/server/job.rb,
lib/qrpc/server/dispatcher.rb
Overview
Queue RPC server.
Defined Under Namespace
Classes: Dispatcher, Job
Constant Summary collapse
- QRPC_PREFIX =
Deprecated.
(since 0.2.0)
Prefix for handled queues.
QRPC::QUEUE_PREFIX
- QRPC_POSTFIX_INPUT =
Deprecated.
(since 0.2.0)
Input queue postfix.
QRPC::QUEUE_POSTFIX_INPUT
- QRPC_POSTFIX_OUTPUT =
Deprecated.
(since 0.2.0)
Output queue postfix.
QRPC::QUEUE_POSTFIX_OUTPUT
- @@servers =
Holds servers for finalizing.
{ }
Class Method Summary collapse
-
.finalize(id) ⇒ Object
Finalizer handler.
Instance Method Summary collapse
-
#finalize! ⇒ Object
Destructor.
-
#initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol) ⇒ Server
constructor
Constructor.
-
#input_name ⇒ Symbol
Returns input name.
-
#input_queue(&block) ⇒ Object
Returns input queue.
-
#listen!(locator, opts = { }) ⇒ Object
Listens to the queue.
-
#output_name(client) ⇒ Symbol
Returns output name for client name.
-
#output_queue(&block) ⇒ Object
Returns output queue.
-
#start_listening(locator, opts = { }) ⇒ Object
Starts listening to the queue.
Constructor Details
#initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol) ⇒ Server
Constructor.
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/qrpc/server.rb', line 127 def initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol) @api = api @protocol = protocol @synchronicity = synchronicity @output_name_cache = { } # Destructor ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc) @@servers[self.object_id] = self end |
Class Method Details
.finalize(id) ⇒ Object
Finalizer handler.
143 144 145 146 147 |
# File 'lib/qrpc/server.rb', line 143 def self.finalize(id) if @@servers.has_key? id @@servers[id].finalize! end end |
Instance Method Details
#finalize! ⇒ Object
Destructor.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/qrpc/server.rb', line 153 def finalize! if not @input_queue.nil? @input_queue.subscribe("default") do @input_queue.unsubscribe(@input_name.to_s) do @input_queue.close! end end end if not @output_queue.nil? @output_queue.use("default") do @output_queue.close end end end |
#input_name ⇒ Symbol
Returns input name.
270 271 272 273 274 275 276 |
# File 'lib/qrpc/server.rb', line 270 def input_name if @input_name.nil? @input_name = (QRPC::QUEUE_PREFIX + "-" + @locator.queue_name + "-" + QRPC::QUEUE_POSTFIX_INPUT).to_sym end return @input_name end |
#input_queue(&block) ⇒ Object
Returns input queue.
214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/qrpc/server.rb', line 214 def input_queue(&block) if @input_queue.nil? @input_queue = @locator.input_queue @input_queue.subscribe(self.input_name.to_s) do @input_queue.unsubscribe("default") do yield @input_queue end end else @input_queue.subscribe(self.input_name.to_s) do yield @input_queue end end end |
#listen!(locator, opts = { }) ⇒ Object
Listens to the queue. (Blocking call which starts eventmachine.)
178 179 180 181 182 |
# File 'lib/qrpc/server.rb', line 178 def listen!(locator, opts = { }) EM.run do self.start_listening(locator, opts) end end |
#output_name(client) ⇒ Symbol
Returns output name for client name.
249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/qrpc/server.rb', line 249 def output_name(client) client_index = client.to_sym if not @output_name_cache.include? client_index output_name = QRPC::QUEUE_PREFIX + "-" + client.to_s + "-" + QRPC::QUEUE_POSTFIX_OUTPUT output_name = output_name.to_sym @output_name_cache[client_index] = output_name else output_name = @output_name_cache[client_index] end return output_name end |
#output_queue(&block) ⇒ Object
Returns output queue.
234 235 236 237 238 239 240 |
# File 'lib/qrpc/server.rb', line 234 def output_queue(&block) if @output_queue.nil? @output_queue = @locator.output_queue else @output_queue end end |
#start_listening(locator, opts = { }) ⇒ Object
Starts listening to the queue. (Blocking queue which expect, eventmachine is started.)
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/qrpc/server.rb', line 192 def start_listening(locator, opts = { }) @locator = locator @dispatcher = QRPC::Server::Dispatcher::new # Cache cleaning dispatcher EM::add_periodic_timer(20) do @output_name_cache.clear end # Process input queue self.input_queue do |queue| queue.pop(true) do |job| self.process_job(job) end end end |