Class: Wakame::ActionManager
- Inherits:
-
Object
- Object
- Wakame::ActionManager
- Includes:
- Manager
- Defined in:
- lib/wakame/action_manager.rb
Instance Attribute Summary collapse
-
#active_jobs ⇒ Object
readonly
Returns the value of attribute active_jobs.
-
#lock_queue ⇒ Object
readonly
Returns the value of attribute lock_queue.
Instance Method Summary collapse
- #agent_monitor ⇒ Object
- #cancel_action(job_id) ⇒ Object
- #command_queue ⇒ Object
- #init ⇒ Object
-
#initialize ⇒ ActionManager
constructor
A new instance of ActionManager.
- #master ⇒ Object
- #run_action(action) ⇒ Object
- #terminate ⇒ Object
- #trigger_action(action) ⇒ Object
Methods included from Manager
Constructor Details
#initialize ⇒ ActionManager
Returns a new instance of ActionManager.
26 27 28 29 30 |
# File 'lib/wakame/action_manager.rb', line 26 def initialize() @active_jobs = {} @job_history = [] @lock_queue = LockQueue.new end |
Instance Attribute Details
#active_jobs ⇒ Object (readonly)
Returns the value of attribute active_jobs.
12 13 14 |
# File 'lib/wakame/action_manager.rb', line 12 def active_jobs @active_jobs end |
#lock_queue ⇒ Object (readonly)
Returns the value of attribute lock_queue.
12 13 14 |
# File 'lib/wakame/action_manager.rb', line 12 def lock_queue @lock_queue end |
Instance Method Details
#agent_monitor ⇒ Object
22 23 24 |
# File 'lib/wakame/action_manager.rb', line 22 def agent_monitor master.agent_monitor end |
#cancel_action(job_id) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/wakame/action_manager.rb', line 38 def cancel_action(job_id) job_context = @active_jobs[job_id] if job_context.nil? Wakame.log.warn("JOB ID #{job_id} was not running.") return end return if job_context[:complete_at] root_act = job_context[:root_action] walk_subactions = proc { |a| if a.status == :running && (a.target_thread && a.target_thread.alive?) && a.target_thread != Thread.current Wakame.log.debug "Raising CancelBroadcast exception: #{a.class} #{a.target_thread}(#{a.target_thread.status}), current=#{Thread.current}" # Broadcast the special exception to all a.target_thread.raise(CancelBroadcast, "It's broadcasted from #{a.class}") # IMPORTANT: Ensure the worker thread to handle the exception. #Thread.pass end a.subactions.each { |n| walk_subactions.call(n) } } begin Thread.critical = true walk_subactions.call(root_act) ensure Thread.critical = false # IMPORTANT: Ensure the worker thread to handle the exception. Thread.pass end end |
#command_queue ⇒ Object
18 19 20 |
# File 'lib/wakame/action_manager.rb', line 18 def command_queue master.command_queue end |
#init ⇒ Object
32 33 |
# File 'lib/wakame/action_manager.rb', line 32 def init end |
#master ⇒ Object
14 15 16 |
# File 'lib/wakame/action_manager.rb', line 14 def master Wakame::Master.instance end |
#run_action(action) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/wakame/action_manager.rb', line 83 def run_action(action) raise ArguemntError unless action.is_a?(Action) job_context = @active_jobs[action.job_id] raise "The job session is killed.: job_id=#{action.job_id}" if job_context.nil? EM.next_tick { begin if job_context[:start_at].nil? job_context[:start_at] = Time.new ED.fire_event(Event::JobStart.new(action.job_id)) end EM.defer proc { res = nil begin action.bind_thread(Thread.current) action.status = :running Wakame.log.debug("Start action : #{action.class.to_s} #{action.parent_action.nil? ? '' : ('sub-action of ' + action.parent_action.class.to_s)}") ED.fire_event(Event::ActionStart.new(action)) begin action.run action.completion_status = :succeeded Wakame.log.debug("Complete action : #{action.class.to_s}") ED.fire_event(Event::ActionComplete.new(action)) end rescue CancelBroadcast => e Wakame.log.info("Received cancel signal: #{e}") action.completion_status = :canceled begin action.on_canceled rescue => e Wakame.log.error(e) end ED.fire_event(Event::ActionFailed.new(action, e)) res = e rescue => e Wakame.log.error("Failed action : #{action.class.to_s} due to #{e}") Wakame.log.error(e) action.completion_status = :failed begin action.on_failed rescue => e Wakame.log.error(e) end ED.fire_event(Event::ActionFailed.new(action, e)) # Escalate the cancelation event to parents. unless action.parent_action.nil? action.parent_action.notify(e) end # Force to cancel the current job when the root action ignored the elevated exception. if action === job_context[:root_action] Wakame.log.warn("The escalated exception (#{e.class}) has reached to the root action (#{action.class}). Forcing to cancel the current job #{job_context[:job_id]}") cancel_action(job_context[:job_id]) #rescue Wakame.log.error($!) end res = e ensure action.status = :complete action.bind_thread(nil) end res }, proc { |res| unless @active_jobs.has_key?(job_context[:job_id]) next end actary = [] job_context[:root_action].walk_subactions {|a| actary << a } #Wakame.log.debug(actary.collect{|a| {a.class.to_s=>a.status}}.inspect) if res.is_a?(Exception) job_context[:exception]=res end if actary.all? { |act| act.status == :complete } if actary.all? { |act| act.completion_status == :succeeded } ED.fire_event(Event::JobComplete.new(action.job_id)) else ED.fire_event(Event::JobFailed.new(action.job_id, res)) end job_context[:complete_at]=Time.now @job_history << job_context @active_jobs.delete(job_context[:job_id]) @lock_queue.quit(job_context[:job_id]) end } rescue => e Wakame.log.error(e) end } end |
#terminate ⇒ Object
35 36 |
# File 'lib/wakame/action_manager.rb', line 35 def terminate end |
#trigger_action(action) ⇒ Object
72 73 74 75 76 77 78 79 80 |
# File 'lib/wakame/action_manager.rb', line 72 def trigger_action(action) raise ArguemntError unless action.is_a?(Action) context = create_job_context(action) action.action_manager = self action.job_id = context[:job_id] run_action(action) action.job_id end |