Class: Wakame::ActorRequest
- Inherits:
-
Object
- Object
- Wakame::ActorRequest
- Defined in:
- lib/wakame/master.rb
Instance Attribute Summary collapse
-
#master ⇒ Object
readonly
Returns the value of attribute master.
-
#return_value ⇒ Object
readonly
Returns the value of attribute return_value.
Instance Method Summary collapse
- #cancel ⇒ Object
-
#initialize(master, packet) ⇒ ActorRequest
constructor
A new instance of ActorRequest.
- #progress ⇒ Object
- #request ⇒ Object
- #token ⇒ Object
- #wait_completion(tout = 60*30) ⇒ Object (also: #wait)
Constructor Details
#initialize(master, packet) ⇒ ActorRequest
Returns a new instance of ActorRequest.
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/wakame/master.rb', line 100 def initialize(master, packet) raise TypeError unless packet.is_a?(Wakame::Packets::ActorRequest) @master = master @packet = packet @requested = false @event_ticket = nil @return_value = nil @wait_lock = ::Queue.new end |
Instance Attribute Details
#master ⇒ Object (readonly)
Returns the value of attribute master.
98 99 100 |
# File 'lib/wakame/master.rb', line 98 def master @master end |
#return_value ⇒ Object (readonly)
Returns the value of attribute return_value.
98 99 100 |
# File 'lib/wakame/master.rb', line 98 def return_value @return_value end |
Instance Method Details
#cancel ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/wakame/master.rb', line 141 def cancel check_requested? raise NotImplementedError #master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", Wakame::Packets::ActorCancel.new(@packet.agent_id, ).marshal) #ED.unsubscribe(@event_ticket) end |
#progress ⇒ Object
136 137 138 139 |
# File 'lib/wakame/master.rb', line 136 def progress check_requested? raise NotImplementedError end |
#request ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/wakame/master.rb', line 112 def request raise "The request has already been sent." if @requested @event_ticket = EventDispatcher.subscribe(Event::ActorComplete) { |event| if event.token == @packet.token # Any of status except RUNNING are accomplishment of the actor request. Wakame.log.debug("#{self.class}: The actor request has been completed: token=#{self.token}, status=#{event.status}, return_value=#{event.return_value}") EventDispatcher.unsubscribe(@event_ticket) @return_value = event.return_value @wait_lock.enq([event.status, event.return_value]) end } Wakame.log.debug("#{self.class}: Send the actor request: #{@packet.path}@#{@packet.agent_id}, token=#{self.token}") master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", @packet.marshal) @requested = true self end |
#token ⇒ Object
132 133 134 |
# File 'lib/wakame/master.rb', line 132 def token @packet.token end |
#wait_completion(tout = 60*30) ⇒ Object Also known as: wait
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/wakame/master.rb', line 149 def wait_completion(tout=60*30) check_requested? timeout(tout) { Wakame.log.debug("#{self.class}: Waiting a response from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}") ret_status, ret_val = @wait_lock.deq Wakame.log.debug("#{self.class}: A response (status=#{ret_status}) back from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}") if ret_status == Actor::STATUS_FAILED raise RuntimeError, "Failed status has been returned: Actor Request #{token}" end ret_val } end |