Class: Nanite::Dispatcher
Instance Attribute Summary collapse
-
#amq ⇒ Object
readonly
Returns the value of attribute amq.
-
#evmclass ⇒ Object
Returns the value of attribute evmclass.
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
-
#serializer ⇒ Object
readonly
Returns the value of attribute serializer.
Instance Method Summary collapse
- #dispatch(deliverable) ⇒ Object
-
#initialize(amq, registry, serializer, identity, options) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
Constructor Details
#initialize(amq, registry, serializer, identity, options) ⇒ Dispatcher
Returns a new instance of Dispatcher.
6 7 8 9 10 11 12 13 14 |
# File 'lib/nanite/nanite_dispatcher.rb', line 6 def initialize(amq, registry, serializer, identity, ) @amq = amq @registry = registry @serializer = serializer @identity = identity @options = @evmclass = EM @evmclass.threadpool_size = (@options[:threadpool_size] || 20).to_i end |
Instance Attribute Details
#amq ⇒ Object (readonly)
Returns the value of attribute amq.
3 4 5 |
# File 'lib/nanite/nanite_dispatcher.rb', line 3 def amq @amq end |
#evmclass ⇒ Object
Returns the value of attribute evmclass.
4 5 6 |
# File 'lib/nanite/nanite_dispatcher.rb', line 4 def evmclass @evmclass end |
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
3 4 5 |
# File 'lib/nanite/nanite_dispatcher.rb', line 3 def identity @identity end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
3 4 5 |
# File 'lib/nanite/nanite_dispatcher.rb', line 3 def @options end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
3 4 5 |
# File 'lib/nanite/nanite_dispatcher.rb', line 3 def registry @registry end |
#serializer ⇒ Object (readonly)
Returns the value of attribute serializer.
3 4 5 |
# File 'lib/nanite/nanite_dispatcher.rb', line 3 def serializer @serializer end |
Instance Method Details
#dispatch(deliverable) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/nanite/nanite_dispatcher.rb', line 16 def dispatch(deliverable) prefix, meth = deliverable.type.split('/')[1..-1] meth ||= :index actor = registry.actor_for(prefix) operation = lambda do begin intermediate_results_proc = lambda { |*args| self.handle_intermediate_results(actor, meth, deliverable, *args) } args = [ deliverable.payload ] args.push(deliverable) if actor.method(meth).arity == 2 actor.send(meth, *args, &intermediate_results_proc) rescue Exception => e handle_exception(actor, meth, deliverable, e) end end callback = lambda do |r| if deliverable.kind_of?(Request) r = Result.new(deliverable.token, deliverable.reply_to, r, identity) Nanite::Log.info("SEND #{r.to_s([])}") amq.queue(deliverable.reply_to, :no_declare => [:secure]).publish(serializer.dump(r)) end r # For unit tests end if @options[:single_threaded] || @options[:thread_poolsize] == 1 @evmclass.next_tick { callback.call(operation.call) } else @evmclass.defer(operation, callback) end end |