Class: QRPC::Client::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/qrpc/client/dispatcher.rb

Overview

Queue RPC client dispaxtcher (worker).

Since:

  • 0.3.0

Constant Summary collapse

@@clients =

Holds clients for finalizing.

Since:

  • 0.3.0

{ }

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(locator, generator = QRPC::default_generator, protocol = QRPC::default_protocol) ⇒ Dispatcher

Constructor.

Parameters:

Since:

  • 0.3.0



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/qrpc/client/dispatcher.rb', line 100

def initialize(locator, generator = QRPC::default_generator, protocol = QRPC::default_protocol)
    @protocol = protocol
    @locator = locator
    @generator = generator
    @pooling = false
    @jobs = { }

    # Destructor
    ObjectSpace.define_finalizer(self, self.class.method(:finalize).to_proc)
    @@clients[self.object_id] = self
end

Class Method Details

.finalize(id) ⇒ Object

Finalizer handler.

Parameters:

  • id (Integer)

    id of finalized instance

Since:

  • 0.3.0



117
118
119
120
121
# File 'lib/qrpc/client/dispatcher.rb', line 117

def self.finalize(id)
    if @@clients.has_key? id
        @@clients[id].finalize!
    end
end

Instance Method Details

#create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block) ⇒ QRPC::Client::Job

Creates job associated to this client session.

Parameters:

  • name (Symbol)

    name of the method of the job

  • args (Array)

    arguments of the method call

  • priority (Integer) (defaults to: QRPC::DEFAULT_PRIORITY)

    job priority

  • block (Proc)

    result returning callback

Returns:

Since:

  • 0.3.0



153
154
155
# File 'lib/qrpc/client/dispatcher.rb', line 153

def create_job(name, args, priority = QRPC::DEFAULT_PRIORITY, &block)
    Client::Job::new(self.id, name, args, priority, @generator, @protocol, &block)
end

#finalize!Object

Destructor.

Since:

  • 0.3.0



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/qrpc/client/dispatcher.rb', line 127

def finalize!
    if not @input_queue.nil?
        @input_queue.subscribe("default") do
            @input_queue.unsubscribe(@input_name.to_s) do
                @input_queue.close!
            end
        end
    end
    
    if not @output_queue.nil?
        @output_queue.use("default") do
            @output_queue.close!
        end
    end
end

#idSymbol

Returns client (or maybe session is better) ID.

Returns:

  • (Symbol)

    client (session) ID

Since:

  • 0.3.0



279
280
281
282
283
284
285
# File 'lib/qrpc/client/dispatcher.rb', line 279

def id
    if @id.nil?
        @id = @generator.generate(self)
    else
        @id
    end
end

#input_nameSymbol

Returns input name.

Returns:

  • (Symbol)

    input queue name

Since:

  • 0.3.0



215
216
217
218
219
220
221
# File 'lib/qrpc/client/dispatcher.rb', line 215

def input_name
    if @input_name.nil?
        @input_name = (QRPC::QUEUE_PREFIX + "-" + self.id.to_s + "-" + QRPC::QUEUE_POSTFIX_OUTPUT).to_sym
    else
        @input_name
    end
end

#input_queue(&block) ⇒ Object

Returns input queue.

Parameters:

  • block (Proc)

    block to which will be input queue given

Since:

  • 0.3.0



228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/qrpc/client/dispatcher.rb', line 228

def input_queue(&block)
    if @input_queue.nil?
        @input_queue = @locator.input_queue
        @input_queue.subscribe(self.input_name.to_s) do
            @input_queue.unsubscribe("default") do
                yield @input_queue 
            end
        end
    else
        @input_queue.subscribe(self.input_name.to_s) do
            yield @input_queue
        end
    end
end

#output_nameSymbol

Returns output name.

Returns:

  • (Symbol)

    output queue name

Since:

  • 0.3.0



248
249
250
251
252
253
254
# File 'lib/qrpc/client/dispatcher.rb', line 248

def output_name
    if @output_name.nil?
        @output_name = (QRPC::QUEUE_PREFIX + "-" + @locator.queue_name + "-" + QRPC::QUEUE_POSTFIX_INPUT).to_sym
    else
        @output_name
    end
end

#output_queue(&block) ⇒ Object

Returns output queue.

Parameters:

  • block (Proc)

    block to which will be output queue given

Since:

  • 0.3.0



261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/qrpc/client/dispatcher.rb', line 261

def output_queue(&block)
    if @output_queue.nil?
        @output_queue = @locator.output_queue
        @output_queue.use(self.output_name.to_s) do
            yield @output_queue
        end
    else
        @output_queue.use(self.output_name.to_s) do
            yield @output_queue
        end
    end
end

#pool!Object

Starts input (results) pooling.

Since:

  • 0.3.0



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/qrpc/client/dispatcher.rb', line 182

def pool!
    
    # Results processing logic
    processor = Proc::new do |job|
        response = @protocol.response::parse(job)
        
        if not response.id.nil?
            id = response.id
            id = id.to_sym if not id.kind_of? Integer
            
            if @jobs.include? id
                @jobs[id].assign_result(response)
                @jobs.delete(id)
            end
        end
    end
    
    # Runs processor for each job (expects recurring #pop)   
    self.input_queue do |queue|
        queue.pop(true, &processor)
    end
    
    ##
    
    @pooling = true
    
end

#put(job) ⇒ Object

Puts job to client.

Parameters:

Since:

  • 0.3.0



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/qrpc/client/dispatcher.rb', line 161

def put(job)
    if not job.notification?
        id = job.id
        id = id.to_sym if not id.kind_of? Integer
        
        @jobs[id] = job
    end
    
    if (not @pooling) and (@jobs.length > 0)
        self.pool!
    end
    
    self.output_queue do |queue|
        queue.push(job.serialize)
    end
end