Class: Wakame::ActorRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/wakame/master.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(master, packet) ⇒ ActorRequest

Returns a new instance of ActorRequest.

Raises:

  • (TypeError)


423
424
425
426
427
428
429
430
431
432
# File 'lib/wakame/master.rb', line 423

def initialize(master, packet)
  raise TypeError unless packet.is_a?(Wakame::Packets::ActorRequest)

  @master = master
  @packet = packet
  @requested = false
  @event_ticket = nil
  @return_value = nil
  @wait_lock = ::Queue.new
end

Instance Attribute Details

#masterObject (readonly)

Returns the value of attribute master.



421
422
423
# File 'lib/wakame/master.rb', line 421

def master
  @master
end

#return_valueObject (readonly)

Returns the value of attribute return_value.



421
422
423
# File 'lib/wakame/master.rb', line 421

def return_value
  @return_value
end

Instance Method Details

#cancelObject

Raises:

  • (NotImplementedError)


464
465
466
467
468
469
470
# File 'lib/wakame/master.rb', line 464

def cancel
  check_requested?
  raise NotImplementedError
  
  #master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", Wakame::Packets::ActorCancel.new(@packet.agent_id, ).marshal)
  #ED.unsubscribe(@event_ticket)
end

#progressObject

Raises:

  • (NotImplementedError)


459
460
461
462
# File 'lib/wakame/master.rb', line 459

def progress
  check_requested?
  raise NotImplementedError
end

#requestObject



435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/wakame/master.rb', line 435

def request
  raise "The request has already been sent." if @requested

  @event_ticket = EventDispatcher.subscribe(Event::ActorComplete) { |event|
    if event.token == @packet.token
     
      # Any of status except RUNNING are accomplishment of the actor request.
      Wakame.log.debug("#{self.class}: The actor request has been completed: token=#{self.token}, status=#{event.status}, return_value=#{event.return_value}")
      EventDispatcher.unsubscribe(@event_ticket)
      @return_value = event.return_value
      @wait_lock.enq([event.status, event.return_value])
    end
  }
  Wakame.log.debug("#{self.class}: Send the actor request: #{@packet.path}@#{@packet.agent_id}, token=#{self.token}")
  master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", @packet.marshal)
  @requested = true
  self
end

#tokenObject



455
456
457
# File 'lib/wakame/master.rb', line 455

def token
  @packet.token
end

#wait_completion(tout = 60*30) ⇒ Object Also known as: wait



472
473
474
475
476
477
478
479
480
481
482
483
# File 'lib/wakame/master.rb', line 472

def wait_completion(tout=60*30)
  check_requested?
  timeout(tout) {
    Wakame.log.debug("#{self.class}: Waiting a response from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}")
    ret_status, ret_val = @wait_lock.deq
    Wakame.log.debug("#{self.class}: A response (status=#{ret_status}) back from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}")
    if ret_status == Actor::STATUS_FAILED
      raise RuntimeError, "Failed status has been returned: Actor Request #{token}"
    end
    ret_val
  }
end