Module: BlackStack::Pampa
- Defined in:
- lib/pampa.rb
Defined Under Namespace
Constant Summary collapse
- @@nodes =
arrays of workers, nodes, and jobs.
[]
- @@jobs =
[]
- @@logger =
BlackStack::DummyLogger.new(nil)
- @@dispatcher_function =
nil
- @@worker_function =
nil
Class Method Summary collapse
-
.add_job(h) ⇒ Object
add a job to the cluster.
-
.add_jobs(a) ⇒ Object
add an array of jobs to the cluster.
-
.add_node(h) ⇒ Object
add a node to the cluster.
-
.add_nodes(a) ⇒ Object
add an array of nodes to the cluster.
-
.dispatch ⇒ Object
iterate the workers.
- .dispatcher_function ⇒ Object
-
.jobs ⇒ Object
return the array of nodes.
-
.logger ⇒ Object
get and set logger.
-
.nodes ⇒ Object
return the array of nodes.
-
.relaunch(n = 10000) ⇒ Object
iterate the jobs.
-
.run_stand_alone(h, *args) ⇒ Object
This method is used to run a stand alone process.
- .set_logger(l) ⇒ Object
- .set_snippets(h) ⇒ Object
-
.stretch ⇒ Object
get attached and unassigned workers.
- .worker_function ⇒ Object
-
.workers ⇒ Object
return the array of all workers, beloning all nodes.
Class Method Details
.add_job(h) ⇒ Object
add a job to the cluster.
45 46 47 |
# File 'lib/pampa.rb', line 45 def self.add_job(h) @@jobs << BlackStack::Pampa::Job.new(h) end |
.add_jobs(a) ⇒ Object
add an array of jobs to the cluster.
50 51 52 53 54 55 56 57 58 |
# File 'lib/pampa.rb', line 50 def self.add_jobs(a) # validate: the parameter a is an array raise "The parameter a is not an array" unless a.is_a?(Array) # iterate over the array a.each do |h| # create the job self.add_job(h) end end |
.add_node(h) ⇒ Object
add a node to the cluster.
19 20 21 |
# File 'lib/pampa.rb', line 19 def self.add_node(h) @@nodes << BlackStack::Pampa::Node.new(h) end |
.add_nodes(a) ⇒ Object
add an array of nodes to the cluster.
24 25 26 27 28 29 30 31 32 |
# File 'lib/pampa.rb', line 24 def self.add_nodes(a) # validate: the parameter a is an array raise "The parameter a is not an array" unless a.is_a?(Array) # iterate over the array a.each do |h| # create the node self.add_node(h) end end |
.dispatch ⇒ Object
iterate the workers. for each worker, iterate the job.
Parameters:
-
config: relative path of the configuration file. Example: ‘../config.rb’
-
worker: relative path of the worker.rb file. Example: ‘../worker.rb’
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/pampa.rb', line 211 def self.dispatch() # getting logger l = self.logger() # iterate the workers BlackStack::Pampa.workers.each { |worker| l.logs("worker:#{worker.id} (job:#{worker.assigned_job.to_s})... ") if !worker.attached l.logf("detached".green) else if worker.assigned_job.nil? l.logf("unassigned".yellow) else # get the job this worker is assigned to job = BlackStack::Pampa.jobs.select { |j| j.name.to_s == worker.assigned_job.to_s }.first if job.nil? l.logf("job #{job.name} not found".red) else l.logf("done".green + " (#{job.run_dispatch(worker).to_s.blue})") end end end } # @@nodes.each do |node| end |
.dispatcher_function ⇒ Object
80 81 82 |
# File 'lib/pampa.rb', line 80 def self.dispatcher_function @@dispatcher_function end |
.jobs ⇒ Object
return the array of nodes.
61 62 63 |
# File 'lib/pampa.rb', line 61 def self.jobs() @@jobs end |
.logger ⇒ Object
get and set logger
66 67 68 |
# File 'lib/pampa.rb', line 66 def self.logger() @@logger end |
.nodes ⇒ Object
return the array of nodes.
35 36 37 |
# File 'lib/pampa.rb', line 35 def self.nodes() @@nodes end |
.relaunch(n = 10000) ⇒ Object
iterate the jobs. for each job, get all the tasks to relaunch. for each task to relaunch, relaunch it.
Parameters:
-
config: relative path of the configuration file. Example: ‘../config.rb’
-
worker: relative path of the worker.rb file. Example: ‘../worker.rb’
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/pampa.rb', line 184 def self.relaunch(n=10000) # getting logger l = self.logger() # iterate the workers BlackStack::Pampa.jobs.each { |job| l.logs("job:#{job.name}... ") l.logs("Gettting tasks to relaunch (max #{n})... ") tasks = job.relaunching(n) l.logf("done".green + " (#{tasks.size.to_s.blue})") tasks.each { |task| l.logs("Relaunching task #{task[job.field_primary_key.to_sym]}... ") job.relaunch(task) l.logf 'done'.green } l.logf 'done'.green } end |
.run_stand_alone(h, *args) ⇒ Object
This method is used to run a stand alone process. Parameters:
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 |
# File 'lib/pampa.rb', line 743 def self.run_stand_alone(h, *args) err = [] #err << "The parameter 'log_filename' is required." if h[:log_filename].nil? || h[:log_filename].to_s.empty? err << "The parameter 'delay' is required." if h[:delay].nil? || h[:delay].to_s.empty? err << "The parameter 'run_once' is required." if h[:run_once].nil? || h[:run_once].to_s.empty? err << "The parameter 'function' is required." if h[:function].nil? || h[:function].to_s.empty? err << "The parameter 'log_filename' must be a string." if h[:log_filename] && !h[:log_filename].is_a?(String) err << "The parameter 'delay' must be an integer." if h[:delay] && !h[:delay].is_a?(Integer) err << "The parameter 'run_once' must be a boolean." if h[:run_once] && ![true, false].include?(h[:run_once]) err << "The parameter 'function' must be a Proc." if h[:function] && !h[:function].is_a?(Proc) log_filename = h[:log_filename] delay = h[:delay] run_once = h[:run_once] function = h[:function] l = log_filename.nil? ? BlackStack::DummyLogger.new(nil) : BlackStack::LocalLogger.new(log_filename) while true # get the start loop time l.logs 'Starting loop... ' start = Time.now() l.logf 'done'.green begin function.call(l) # catch general exceptions rescue => e l.logf "Error: #{e.to_console.red}" # CTRL+C will be catched here rescue Interrupt => e l.logf "Interrupted".red exit(0) end l.logs 'Releasing resources... ' GC.start l.logf 'done'.green if run_once l.log "Finished Loop!\n".blue exit(0) end # get the end loop time l.logs 'Ending loop... ' finish = Time.now() l.logf 'done'.green # get different in seconds between start and finish # if diff > 30 seconds l.logs 'Calculating loop duration... ' diff = finish - start l.logf 'done ('+diff.to_s.blue+')' l.log "Finished Loop!\n".blue if diff < delay # sleep for 30 seconds n = delay-diff l.logs 'Sleeping for '+n.to_label.blue+' seconds... ' sleep n l.logf 'done'.green else l.log 'No sleeping. The loop took '+diff.to_label.blue+' seconds.' end # if diff < delay end # while (true) end |
.set_logger(l) ⇒ Object
70 71 72 |
# File 'lib/pampa.rb', line 70 def self.set_logger(l) @@logger = l end |
.set_snippets(h) ⇒ Object
75 76 77 78 |
# File 'lib/pampa.rb', line 75 def self.set_snippets(h) @@dispatcher_function = h[:dispatcher_function] if h.has_key?(:dispatcher_function) @@worker_function = h[:worker_function] if h.has_key?(:worker_function) end |
.stretch ⇒ Object
get attached and unassigned workers. assign and unassign workers to jobs.
Parameters:
-
config: relative path of the configuration file. Example: ‘../config.rb’
-
worker: relative path of the worker.rb file. Example: ‘../worker.rb’
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/pampa.rb', line 96 def self.stretch() # getting logger l = self.logger() # get the job this worker is working with BlackStack::Pampa.jobs.each { |job| l.logs "job #{job.name}... " # get attached and unassigned workers l.logs "Getting attached and unassigned workers... " workers = BlackStack::Pampa.workers.select { |w| w.attached && w.assigned_job.nil? } l.logf 'done'.green + " (#{workers.size.to_s.blue})" # get the workers that match the filter l.logs "Getting workers that match the filter... " workers = workers.select { |w| w.id =~ job.filter_worker_id } l.logf "done".green + " (#{workers.size.to_s.blue})" # if theere are workers if workers.size > 0 l.logs("Gettting assigned workers... ") assigned = BlackStack::Pampa.workers.select { |worker| worker.attached && worker.assigned_job.to_s == job.name.to_s } l.logf "done ".green + " (#{assigned.size.to_s.blue})" l.logs("Getting total pending (pending) tasks... ") pendings = job.pending l.logf "done".green + " (#{pendings.to_s.blue})" l.logs("0 pending tasks?.... ") if pendings.size == 0 l.logf "yes".green l.logs("Unassigning all assigned workers... ") assigned.each { |w| l.logs("Unassigning worker... ") w.assigned_job = nil workers << w # add worker back to the list of unassigned l.logf "done".green + " (#{w.id.to_s.blue})" } l.logf 'done'.green else l.logf "no".red l.logs("Under :max_pending_tasks (#{job.max_pending_tasks}) and more than 1 assigned workers ?... ") if pendings.size < job.max_pending_tasks && assigned.size > 1 l.logf "yes".green while assigned.size > 1 l.logs("Unassigning worker... ") w = assigned.pop # TODO: find a worker with no pending tasks w.assigned_job = nil workers << w # add worker back to the array of unassigned workers l.logf "done".green + " (#{w.id.to_s.blue})" end else l.logf "no".red l.logs("Over :max_assigned_workers (#{job.max_assigned_workers.to_s.blue}) and more than 1 assigned workers?... ") if assigned.size >= job.max_assigned_workers && assigned.size > 1 l.logf("yes".green) else l.logf("no".red) i = assigned.size while i < job.max_assigned_workers i += 1 l.logs("Assigning worker... ") w = workers.pop if w.nil? l.logf("no more workers".yellow) break else w.assigned_job = job.name.to_sym l.logf "done".green + " (#{w.id.to_s.blue})" end end # while i < job.max_assigned_workers end # if assigned.size >= job.max_assigned_workers && assigned.size > 0 end # if pendings.size < job.max_pending_tasks && assigned.size > 1 end # if pendings.size == 0 end # if workers.size > 0 l.logf 'done'.green } end |
.worker_function ⇒ Object
84 85 86 |
# File 'lib/pampa.rb', line 84 def self.worker_function @@worker_function end |
.workers ⇒ Object
return the array of all workers, beloning all nodes.
40 41 42 |
# File 'lib/pampa.rb', line 40 def self.workers() @@nodes.map { |node| node.workers }.flatten end |