Class: Wakame::MasterManagers::ActionManager
- Inherits:
-
Object
- Object
- Wakame::MasterManagers::ActionManager
- Includes:
- Wakame::MasterManager
- Defined in:
- lib/wakame/master_managers/action_manager.rb
Instance Attribute Summary collapse
-
#active_jobs ⇒ Object
readonly
Returns the value of attribute active_jobs.
-
#lock_queue ⇒ Object
readonly
Returns the value of attribute lock_queue.
Instance Method Summary collapse
- #agent_monitor ⇒ Object
- #cancel_action(job_id) ⇒ Object
- #command_queue ⇒ Object
- #init ⇒ Object
-
#initialize ⇒ ActionManager
constructor
A new instance of ActionManager.
- #master ⇒ Object
- #run_action(action) ⇒ Object
- #terminate ⇒ Object
- #trigger_action(action = nil, &blk) ⇒ Object
Methods included from Wakame::MasterManager
Constructor Details
#initialize ⇒ ActionManager
Returns a new instance of ActionManager.
28 29 30 31 32 |
# File 'lib/wakame/master_managers/action_manager.rb', line 28 def initialize() @active_jobs = {} @job_history = [] @lock_queue = LockQueue.new end |
Instance Attribute Details
#active_jobs ⇒ Object (readonly)
Returns the value of attribute active_jobs.
14 15 16 |
# File 'lib/wakame/master_managers/action_manager.rb', line 14 def active_jobs @active_jobs end |
#lock_queue ⇒ Object (readonly)
Returns the value of attribute lock_queue.
14 15 16 |
# File 'lib/wakame/master_managers/action_manager.rb', line 14 def lock_queue @lock_queue end |
Instance Method Details
#agent_monitor ⇒ Object
24 25 26 |
# File 'lib/wakame/master_managers/action_manager.rb', line 24 def agent_monitor master.agent_monitor end |
#cancel_action(job_id) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/wakame/master_managers/action_manager.rb', line 40 def cancel_action(job_id) job_context = @active_jobs[job_id] if job_context.nil? Wakame.log.warn("JOB ID #{job_id} was not running.") return end return if job_context[:complete_at] root_act = job_context[:root_action] walk_subactions = proc { |a| if a.status == :running && (a.target_thread && a.target_thread.alive?) && a.target_thread != Thread.current Wakame.log.debug "Raising CancelBroadcast exception: #{a.class} #{a.target_thread}(#{a.target_thread.status}), current=#{Thread.current}" # Broadcast the special exception to all a.target_thread.raise(CancelBroadcast, "It's broadcasted from #{a.class}") # IMPORTANT: Ensure the worker thread to handle the exception. #Thread.pass end a.subactions.each { |n| walk_subactions.call(n) } } begin Thread.critical = true walk_subactions.call(root_act) ensure Thread.critical = false # IMPORTANT: Ensure the worker thread to handle the exception. Thread.pass end end |
#command_queue ⇒ Object
20 21 22 |
# File 'lib/wakame/master_managers/action_manager.rb', line 20 def command_queue master.command_queue end |
#init ⇒ Object
34 35 |
# File 'lib/wakame/master_managers/action_manager.rb', line 34 def init end |
#master ⇒ Object
16 17 18 |
# File 'lib/wakame/master_managers/action_manager.rb', line 16 def master Wakame::Master.instance end |
#run_action(action) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/wakame/master_managers/action_manager.rb', line 89 def run_action(action) raise ArgumentError unless action.is_a?(Action) job_context = @active_jobs[action.job_id] raise "The job session is killed.: job_id=#{action.job_id}" if job_context.nil? EM.next_tick { begin if job_context[:start_at].nil? job_context[:start_at] = Time.new ED.fire_event(Event::JobStart.new(action.job_id)) end EM.defer proc { res = nil begin action.bind_thread(Thread.current) action.status = :running Wakame.log.debug("Start action : #{action.class.to_s} #{action.parent_action.nil? ? '' : ('sub-action of ' + action.parent_action.class.to_s)}") ED.fire_event(Event::ActionStart.new(action)) begin action.run action.completion_status = :succeeded Wakame.log.debug("Complete action : #{action.class.to_s}") ED.fire_event(Event::ActionComplete.new(action)) end rescue CancelBroadcast => e Wakame.log.info("Received cancel signal: #{e}") action.completion_status = :canceled begin action.on_canceled rescue => e Wakame.log.error(e) end ED.fire_event(Event::ActionFailed.new(action, e)) res = e rescue => e Wakame.log.error("Failed action : #{action.class.to_s} due to #{e}") Wakame.log.error(e) action.completion_status = :failed begin action.on_failed rescue => e Wakame.log.error(e) end ED.fire_event(Event::ActionFailed.new(action, e)) # Escalate the cancelation event to parents. unless action.parent_action.nil? action.parent_action.notify(e) end # Force to cancel the current job when the root action ignored the elevated exception. if action === job_context[:root_action] Wakame.log.warn("The escalated exception (#{e.class}) has reached to the root action (#{action.class}). Forcing to cancel the current job #{job_context[:job_id]}") cancel_action(job_context[:job_id]) #rescue Wakame.log.error($!) end res = e ensure action.status = :complete action.bind_thread(nil) end StatusDB.pass { process_job_complete(action, res) } } rescue => e Wakame.log.error(e) end } end |
#terminate ⇒ Object
37 38 |
# File 'lib/wakame/master_managers/action_manager.rb', line 37 def terminate end |
#trigger_action(action = nil, &blk) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/wakame/master_managers/action_manager.rb', line 74 def trigger_action(action=nil, &blk) if blk action = Action::ProcAction.new(blk) end raise ArgumentError unless action.is_a?(Action) context = create_job_context(action) action.action_manager = self action.job_id = context[:job_id] run_action(action) action.job_id end |