Class: Wakame::ActorRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/wakame/master.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(master, packet) ⇒ ActorRequest

Returns a new instance of ActorRequest.

Raises:

  • (TypeError)


100
101
102
103
104
105
106
107
108
109
# File 'lib/wakame/master.rb', line 100

def initialize(master, packet)
  raise TypeError unless packet.is_a?(Wakame::Packets::ActorRequest)

  @master = master
  @packet = packet
  @requested = false
  @event_ticket = nil
  @return_value = nil
  @wait_lock = ::Queue.new
end

Instance Attribute Details

#masterObject (readonly)

Returns the value of attribute master.



98
99
100
# File 'lib/wakame/master.rb', line 98

def master
  @master
end

#return_valueObject (readonly)

Returns the value of attribute return_value.



98
99
100
# File 'lib/wakame/master.rb', line 98

def return_value
  @return_value
end

Instance Method Details

#cancelObject

Raises:

  • (NotImplementedError)


141
142
143
144
145
146
147
# File 'lib/wakame/master.rb', line 141

def cancel
  check_requested?
  raise NotImplementedError
  
  #master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", Wakame::Packets::ActorCancel.new(@packet.agent_id, ).marshal)
  #ED.unsubscribe(@event_ticket)
end

#progressObject

Raises:

  • (NotImplementedError)


136
137
138
139
# File 'lib/wakame/master.rb', line 136

def progress
  check_requested?
  raise NotImplementedError
end

#requestObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/wakame/master.rb', line 112

def request
  raise "The request has already been sent." if @requested

  @event_ticket = EventDispatcher.subscribe(Event::ActorComplete) { |event|
    if event.token == @packet.token
     
      # Any of status except RUNNING are accomplishment of the actor request.
      Wakame.log.debug("#{self.class}: The actor request has been completed: token=#{self.token}, status=#{event.status}, return_value=#{event.return_value}")
      EventDispatcher.unsubscribe(@event_ticket)
      @return_value = event.return_value
      @wait_lock.enq([event.status, event.return_value])
    end
  }
  Wakame.log.debug("#{self.class}: Send the actor request: #{@packet.path}@#{@packet.agent_id}, token=#{self.token}")
  master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", @packet.marshal)
  @requested = true
  self
end

#tokenObject



132
133
134
# File 'lib/wakame/master.rb', line 132

def token
  @packet.token
end

#wait_completion(tout = 60*30) ⇒ Object Also known as: wait



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/wakame/master.rb', line 149

def wait_completion(tout=60*30)
  check_requested?
  timeout(tout) {
    Wakame.log.debug("#{self.class}: Waiting a response from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}")
    ret_status, ret_val = @wait_lock.deq
    Wakame.log.debug("#{self.class}: A response (status=#{ret_status}) back from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}")
    if ret_status == Actor::STATUS_FAILED
      raise RuntimeError, "Failed status has been returned: Actor Request #{token}"
    end
    ret_val
  }
end