Class: QRPC::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/qrpc/server.rb,
lib/qrpc/server/job.rb,
lib/qrpc/server/dispatcher.rb

Overview

Queue RPC server.

Defined Under Namespace

Classes: Dispatcher, Job

Constant Summary collapse

QRPC_PREFIX =
Deprecated.

(since 0.2.0)

Prefix for handled queues.

QRPC::QUEUE_PREFIX
QRPC_POSTFIX_INPUT =
Deprecated.

(since 0.2.0)

Input queue postfix.

QRPC::QUEUE_POSTFIX_INPUT
QRPC_POSTFIX_OUTPUT =
Deprecated.

(since 0.2.0)

Output queue postfix.

QRPC::QUEUE_POSTFIX_OUTPUT
@@servers =

Holds servers for finalizing.

{ }

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol) ⇒ Server

Constructor.

Parameters:

  • api (Object)

    some object which will be used as RPC API

  • synchronicity (Symbol) (defaults to: :synchronous)

    API methods synchronicity

  • protocol (QRPC::Protocol::Abstract) (defaults to: QRPC::default_protocol)

    a protocol handling instance



127
128
129
130
131
132
133
134
135
136
# File 'lib/qrpc/server.rb', line 127

def initialize(api, synchronicity = :synchronous, protocol = QRPC::default_protocol)
    @api = api
    @protocol = protocol
    @synchronicity = synchronicity
    @output_name_cache = { }
    
    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@servers[self.object_id] = self
end

Class Method Details

.finalize(id) ⇒ Object

Finalizer handler.

Parameters:

  • id (Integer)

    id of finalized instance



143
144
145
146
147
# File 'lib/qrpc/server.rb', line 143

def self.finalize(id)
    if @@servers.has_key? id
        @@servers[id].finalize!
    end
end

Instance Method Details

#finalize!Object

Destructor.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/qrpc/server.rb', line 153

def finalize!
    if not @input_queue.nil?
        @input_queue.subscribe("default") do
            @input_queue.unsubscribe(@input_name.to_s) do
                @input_queue.close!
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close
        end
    end
end

#input_nameSymbol

Returns input name.

Returns:

  • (Symbol)

    input name

Since:

  • 0.1.1



270
271
272
273
274
275
276
# File 'lib/qrpc/server.rb', line 270

def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX + "-" + @locator.queue_name + "-" + QRPC::QUEUE_POSTFIX_INPUT).to_sym
    end
    
    return @input_name
end

#input_queue(&block) ⇒ Object

Returns input queue.

Parameters:

  • block (Proc)

    block to which will be input queue given



214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/qrpc/server.rb', line 214

def input_queue(&block)
    if @input_queue.nil?
        @input_queue = @locator.input_queue 
        @input_queue.subscribe(self.input_name.to_s) do
            @input_queue.unsubscribe("default") do
                yield @input_queue 
            end
        end
    else
        @input_queue.subscribe(self.input_name.to_s) do
            yield @input_queue
        end
    end
end

#listen!(locator, opts = { }) ⇒ Object

Listens to the queue. (Blocking call which starts eventmachine.)

Parameters:

  • locator (QRPC::Locator)

    of the input queue

  • opts (Hash) (defaults to: { })

    options for the server



178
179
180
181
182
# File 'lib/qrpc/server.rb', line 178

def listen!(locator, opts = { })
    EM.run do
        self.start_listening(locator, opts)
    end
end

#output_name(client) ⇒ Symbol

Returns output name for client name.

Parameters:

  • client (String, Symbol)

    client identifier

Returns:

  • (Symbol)

    output name



249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/qrpc/server.rb', line 249

def output_name(client)
    client_index = client.to_sym
    
    if not @output_name_cache.include? client_index
       output_name = QRPC::QUEUE_PREFIX + "-" + client.to_s + "-" + QRPC::QUEUE_POSTFIX_OUTPUT
       output_name = output_name.to_sym
       @output_name_cache[client_index] = output_name
    else
        output_name = @output_name_cache[client_index]
    end
       
    return output_name
end

#output_queue(&block) ⇒ Object

Returns output queue.

Parameters:

  • block (Proc)

    block to which will be output queue given



234
235
236
237
238
239
240
# File 'lib/qrpc/server.rb', line 234

def output_queue(&block)
    if @output_queue.nil?
        @output_queue = @locator.output_queue
    else
        @output_queue
    end
end

#start_listening(locator, opts = { }) ⇒ Object

Starts listening to the queue. (Blocking queue which expect, eventmachine is started.)

Parameters:

  • locator (QRPC::Locator)

    of the input queue

  • opts (Hash) (defaults to: { })

    options for the server



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/qrpc/server.rb', line 192

def start_listening(locator, opts = { })
    @locator = locator
    @dispatcher = QRPC::Server::Dispatcher::new

    # Cache cleaning dispatcher
    EM::add_periodic_timer(20) do
        @output_name_cache.clear
    end

    # Process input queue
    self.input_queue do |queue|
        queue.pop(true) do |job|
            self.process_job(job)
        end
    end
end