Module: Jongleur::API
- Defined in:
- lib/jongleur/api.rb
Overview
Here be methods to be accessed by the gem’s client, i.e. the public API
Class Attribute Summary collapse
Class Method Summary collapse
-
.add_task_graph(task_graph_hash) ⇒ void
Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix.
-
.failed_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that failed to finish successfully.
- .get_predecessor_pids(a_task) ⇒ Object
-
.hung_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that started but failed to finish.
-
.not_ran_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that haven’t been ran.
-
.print_graph(dir = "") ⇒ String
Prints the TaskGraph to a PDF file.
-
.run ⇒ void
The main method.
-
.start_processes ⇒ void
Starts all tasks without dependencies as separate processes.
-
.successful_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that ran successfully.
-
.trap_quit_signals ⇒ void
Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes.
Class Attribute Details
.task_graph ⇒ Object
65 66 67 |
# File 'lib/jongleur/api.rb', line 65 def self.task_graph @@task_graph ||= {} end |
.task_matrix ⇒ Object
56 57 58 |
# File 'lib/jongleur/api.rb', line 56 def self.task_matrix @@task_matrix end |
Class Method Details
.add_task_graph(task_graph_hash) ⇒ void
This method returns an undefined value.
Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/jongleur/api.rb', line 17 def self.add_task_graph(task_graph_hash) @@task_matrix = Array.new raise ArgumentError, 'Value should be Hash {task_name, [descendants]}' unless task_graph_hash.is_a?(Hash) # this task_graph will raise the error below , { A: [:B], B: :C, C: []} task_graph_hash.values.each do |val| raise ArgumentError, 'Dependent Tasks should be wrapped in an Array {task_name, [dependents]}' unless val.is_a?(Array) end # this task_graph will raise the error below , { A: [:B], B: [:C, :D], C: []} if (task_graph_hash.keys.size - task_graph_hash.values.flatten.uniq.size).negative? raise ArgumentError, 'Each dependent Task should also be defined with a separate key entry' end @@task_graph = task_graph_hash @@task_matrix = Implementation.build_task_matrix(task_graph_hash) end |
.failed_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that failed to finish successfully
83 84 85 |
# File 'lib/jongleur/api.rb', line 83 def self.failed_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == false } end |
.get_predecessor_pids(a_task) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/jongleur/api.rb', line 108 def self.get_predecessor_pids(a_task) pids = Array.new Implementation.get_predecessors(a_task).each do |task| pids << Implementation.get_process_id(task) end pids end |
.hung_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that started but failed to finish
102 103 104 105 106 |
# File 'lib/jongleur/api.rb', line 102 def self.hung_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == nil && x.pid != StatusCodes::PROCESS_NOT_YET_RAN } end |
.not_ran_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that haven’t been ran
91 92 93 94 95 96 |
# File 'lib/jongleur/api.rb', line 91 def self.not_ran_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == nil && x.exit_status == nil && x.pid == StatusCodes::PROCESS_NOT_YET_RAN } end |
.print_graph(dir = "") ⇒ String
Prints the TaskGraph to a PDF file
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/jongleur/api.rb', line 36 def self.print_graph(dir="") graph = Graphviz::Graph.new dir = Dir.pwd if (!dir || dir.empty?) file_name = File.("jongleur_graph_#{Time.now.strftime('%m%d%Y_%H%M%S')}.pdf", dir) task_graph.each do |parent_node, child_nodes| new_node = unless graph.node_exists?(parent_node) graph.add_node( parent_node ) else graph.get_node( parent_node ).first end child_nodes.each { |child_node| new_node.add_node(child_node) } end Graphviz::output(graph, path: file_name) file_name end |
.run ⇒ void
This method launches processes without precedence constraints,
This method returns an undefined value.
The main method. It starts the tasks as separate processes, according to their precedence, traps and handles signals, processes messages. On exit it will also print the Task Matrix in the /tmp directory in JSON format
traps child process signals and starts new processes when their antecedents have finished. The method will exit its own process when all children processes have finished.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/jongleur/api.rb', line 126 def self.run unless Implementation.valid_tasks?(task_graph.keys) raise RuntimeError, 'Not all the tasks in the Task Graph are implemented as WorkerTask classes' end Implementation. 'Starting workflow...' trap_quit_signals start_processes trap(:CHLD) do begin # with WNOHANG flag we make sure Process.wait is not blocking while (res = Process.wait2(-1, Process::WNOHANG)) dead_pid = res[0] status = res[1] dead_task_name = '' Implementation.find_task_by(:pid, dead_pid) do |t| t.running = false t.exit_status = status.exitstatus t.success_status = status.success? dead_task_name = t.name end msg = "finished task: %s, process: %i, exit_status: %i, success: %s" Implementation. msg % [dead_task_name, dead_pid, status.exitstatus, status.success?] if status.success? Implementation.run_descendants(dead_task_name) else msg = "Task #{dead_task_name} with process id #{dead_pid} was not succesfully completed." Implementation.(msg) end end # it's possible for the last CHLD signal to arrive after our trap # handler has already called Process.wait twice and reaped the # available status. In such a case we must handle (and ignore) # the oncoming exception so we don't get a crash. rescue Errno::ECHILD end end loop do # We exit once all the child processes and their descendants are # accounted for if Implementation.running_tasks.empty? Implementation. 'Workflow finished' file_name = File.("jongleur_task_matrix_#{Time.now.strftime('%m%d%Y_%H%M%S')}.json", '/tmp') File.open(file_name, 'w') {|f| f.write(task_matrix.to_json) } exit 0 end sleep 1 end end |
.start_processes ⇒ void
This method returns an undefined value.
Starts all tasks without dependencies as separate processes
187 188 189 190 191 192 193 194 195 |
# File 'lib/jongleur/api.rb', line 187 def self.start_processes Implementation.tasks_without_predecessors.each do |t| t.running = true Implementation. "starting task #{t.name}" t.pid = fork do Jongleur.const_get(t.name).new(predecessors: Implementation.get_predecessors(t.name)).execute end end end |
.successful_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that ran successfully
73 74 75 76 77 |
# File 'lib/jongleur/api.rb', line 73 def self.successful_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == true && x.exit_status == 0 } end |
.trap_quit_signals ⇒ void
This method returns an undefined value.
Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes
201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/jongleur/api.rb', line 201 def self.trap_quit_signals %i[INT QUIT].each do |signal| Signal.trap(signal) do Implementation. " #{signal} sent to master process!" Implementation.running_tasks.each do |t| Implementation. "....killing #{t.pid}" Process.kill(:KILL, t.pid) end end end end |