Class: Wakame::ActorRequest
- Inherits:
-
Object
- Object
- Wakame::ActorRequest
- Defined in:
- lib/wakame/master.rb
Instance Attribute Summary collapse
-
#master ⇒ Object
readonly
Returns the value of attribute master.
-
#return_value ⇒ Object
readonly
Returns the value of attribute return_value.
Instance Method Summary collapse
- #cancel ⇒ Object
-
#initialize(master, packet) ⇒ ActorRequest
constructor
A new instance of ActorRequest.
- #progress ⇒ Object
- #request ⇒ Object
- #token ⇒ Object
- #wait_completion(tout = 60*30) ⇒ Object (also: #wait)
Constructor Details
#initialize(master, packet) ⇒ ActorRequest
Returns a new instance of ActorRequest.
423 424 425 426 427 428 429 430 431 432 |
# File 'lib/wakame/master.rb', line 423 def initialize(master, packet) raise TypeError unless packet.is_a?(Wakame::Packets::ActorRequest) @master = master @packet = packet @requested = false @event_ticket = nil @return_value = nil @wait_lock = ::Queue.new end |
Instance Attribute Details
#master ⇒ Object (readonly)
Returns the value of attribute master.
421 422 423 |
# File 'lib/wakame/master.rb', line 421 def master @master end |
#return_value ⇒ Object (readonly)
Returns the value of attribute return_value.
421 422 423 |
# File 'lib/wakame/master.rb', line 421 def return_value @return_value end |
Instance Method Details
#cancel ⇒ Object
464 465 466 467 468 469 470 |
# File 'lib/wakame/master.rb', line 464 def cancel check_requested? raise NotImplementedError #master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", Wakame::Packets::ActorCancel.new(@packet.agent_id, ).marshal) #ED.unsubscribe(@event_ticket) end |
#progress ⇒ Object
459 460 461 462 |
# File 'lib/wakame/master.rb', line 459 def progress check_requested? raise NotImplementedError end |
#request ⇒ Object
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/wakame/master.rb', line 435 def request raise "The request has already been sent." if @requested @event_ticket = EventDispatcher.subscribe(Event::ActorComplete) { |event| if event.token == @packet.token # Any of status except RUNNING are accomplishment of the actor request. Wakame.log.debug("#{self.class}: The actor request has been completed: token=#{self.token}, status=#{event.status}, return_value=#{event.return_value}") EventDispatcher.unsubscribe(@event_ticket) @return_value = event.return_value @wait_lock.enq([event.status, event.return_value]) end } Wakame.log.debug("#{self.class}: Send the actor request: #{@packet.path}@#{@packet.agent_id}, token=#{self.token}") master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", @packet.marshal) @requested = true self end |
#token ⇒ Object
455 456 457 |
# File 'lib/wakame/master.rb', line 455 def token @packet.token end |
#wait_completion(tout = 60*30) ⇒ Object Also known as: wait
472 473 474 475 476 477 478 479 480 481 482 483 |
# File 'lib/wakame/master.rb', line 472 def wait_completion(tout=60*30) check_requested? timeout(tout) { Wakame.log.debug("#{self.class}: Waiting a response from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}") ret_status, ret_val = @wait_lock.deq Wakame.log.debug("#{self.class}: A response (status=#{ret_status}) back from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}") if ret_status == Actor::STATUS_FAILED raise RuntimeError, "Failed status has been returned: Actor Request #{token}" end ret_val } end |