Class: Wakame::MasterManagers::CommandQueue
- Inherits:
-
Object
- Object
- Wakame::MasterManagers::CommandQueue
- Includes:
- Wakame::MasterManager
- Defined in:
- lib/wakame/master_managers/command_queue.rb
Defined Under Namespace
Classes: Adapter
Instance Attribute Summary
Attributes included from Wakame::MasterManager
Instance Method Summary collapse
- #init ⇒ Object
-
#initialize ⇒ CommandQueue
constructor
A new instance of CommandQueue.
- #send_cmd(cmd) ⇒ Object
- #terminate ⇒ Object
Methods included from Wakame::MasterManager
Constructor Details
#initialize ⇒ CommandQueue
Returns a new instance of CommandQueue.
12 13 14 15 16 17 18 |
# File 'lib/wakame/master_managers/command_queue.rb', line 12 def initialize() @queue = Queue.new @result_queue = Queue.new @statistics = { :total_command_count => 0 } end |
Instance Method Details
#init ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/wakame/master_managers/command_queue.rb', line 20 def init @command_thread = Thread.new { Wakame.log.info("#{self.class}: Started command thread: #{Thread.current}") while cmd = @queue.deq begin unless cmd.kind_of?(Wakame::Command) Wakame.log.warn("#{self.class}: Incompatible type of object has been sent to ProcessCommand thread. #{cmd.class}") next end res = nil Wakame.log.debug("#{self.class}: Being processed the command: #{cmd.class}") res = cmd.run res rescue => e Wakame.log.error(e) res = e ensure @result_queue.enq(res) end end } cmdsv_uri = URI.parse(Wakame.config.http_command_server_uri) @thin_server = Thin::Server.new(cmdsv_uri.host, cmdsv_uri.port, Adapter.new(self)) @thin_server.threaded = true @thin_server.start end |
#send_cmd(cmd) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/wakame/master_managers/command_queue.rb', line 55 def send_cmd(cmd) begin @queue.enq(cmd) ED.fire_event(Event::CommandReceived.new(cmd)) return @result_queue.deq() rescue => e Wakame.log.error("#{self.class}:") Wakame.log.error(e) end end |
#terminate ⇒ Object
50 51 52 53 |
# File 'lib/wakame/master_managers/command_queue.rb', line 50 def terminate @thin_server.stop @command_thread.kill end |