Module: BlackStack::Pampa

Defined in:
lib/pampa.rb

Defined Under Namespace

Classes: Job, Node, Worker

Constant Summary collapse

@@nodes =

arrays of workers, nodes, and jobs.

[]
@@jobs =
[]
@@logger =
BlackStack::DummyLogger.new(nil)
@@dispatcher_function =
nil
@@worker_function =
nil

Class Method Summary collapse

Class Method Details

.add_job(h) ⇒ Object

add a job to the cluster.



45
46
47
# File 'lib/pampa.rb', line 45

def self.add_job(h)
    @@jobs << BlackStack::Pampa::Job.new(h)
end

.add_jobs(a) ⇒ Object

add an array of jobs to the cluster.



50
51
52
53
54
55
56
57
58
# File 'lib/pampa.rb', line 50

def self.add_jobs(a)
    # validate: the parameter a is an array
    raise "The parameter a is not an array" unless a.is_a?(Array)
    # iterate over the array
    a.each do |h|
        # create the job
        self.add_job(h)
    end
end

.add_node(h) ⇒ Object

add a node to the cluster.



19
20
21
# File 'lib/pampa.rb', line 19

def self.add_node(h)
    @@nodes << BlackStack::Pampa::Node.new(h)
end

.add_nodes(a) ⇒ Object

add an array of nodes to the cluster.



24
25
26
27
28
29
30
31
32
# File 'lib/pampa.rb', line 24

def self.add_nodes(a)
    # validate: the parameter a is an array
    raise "The parameter a is not an array" unless a.is_a?(Array)
    # iterate over the array
    a.each do |h|
        # create the node
        self.add_node(h)
    end
end

.dispatchObject

iterate the workers. for each worker, iterate the job.

Parameters:

  • config: relative path of the configuration file. Example: ‘../config.rb’

  • worker: relative path of the worker.rb file. Example: ‘../worker.rb’



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/pampa.rb', line 211

def self.dispatch()
    # getting logger
    l = self.logger()
    # iterate the workers
    BlackStack::Pampa.workers.each { |worker|
        l.logs("worker:#{worker.id} (job:#{worker.assigned_job.to_s})... ")
        if !worker.attached
          l.logf("detached".green)
        else
          if worker.assigned_job.nil?
            l.logf("unassigned".yellow)
          else
            # get the job this worker is assigned to
            job = BlackStack::Pampa.jobs.select { |j| j.name.to_s == worker.assigned_job.to_s }.first
            if job.nil?
              l.logf("job #{job.name} not found".red)
            else
              l.logf("done".green + " (#{job.run_dispatch(worker).to_s.blue})")
            end
          end
        end
    } # @@nodes.each do |node|            
end

.dispatcher_functionObject



80
81
82
# File 'lib/pampa.rb', line 80

def self.dispatcher_function
  @@dispatcher_function
end

.jobsObject

return the array of nodes.



61
62
63
# File 'lib/pampa.rb', line 61

def self.jobs()
  @@jobs
end

.loggerObject

get and set logger



66
67
68
# File 'lib/pampa.rb', line 66

def self.logger()
  @@logger
end

.nodesObject

return the array of nodes.



35
36
37
# File 'lib/pampa.rb', line 35

def self.nodes()
    @@nodes
end

.relaunch(n = 10000) ⇒ Object

iterate the jobs. for each job, get all the tasks to relaunch. for each task to relaunch, relaunch it.

Parameters:

  • config: relative path of the configuration file. Example: ‘../config.rb’

  • worker: relative path of the worker.rb file. Example: ‘../worker.rb’



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/pampa.rb', line 184

def self.relaunch(n=10000)
  # getting logger
  l = self.logger()
  # iterate the workers
  BlackStack::Pampa.jobs.each { |job|
    l.logs("job:#{job.name}... ")
      l.logs("Gettting tasks to relaunch (max #{n})... ")
      tasks = job.relaunching(n)
      l.logf("done".green + " (#{tasks.size.to_s.blue})")

      tasks.each { |task| 
        l.logs("Relaunching task #{task[job.field_primary_key.to_sym]}... ")
        job.relaunch(task)
        l.logf 'done'.green
      }

    l.logf 'done'.green
  }
end

.run_stand_alone(h, *args) ⇒ Object

This method is used to run a stand alone process. Parameters:

  • h - the name of the log file. If nil, the log won’t be printed.

  • h - the minimum delay between loops. A minimum of 10 seconds is recommended, in order to don’t hard the database server. Default is 30 seconds.

  • h - avoid infinite loop. Default is false.

  • h - the Proc to be executed in each loop.



743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
# File 'lib/pampa.rb', line 743

def self.run_stand_alone(h, *args)
    err = []
    #err << "The parameter 'log_filename' is required." if h[:log_filename].nil? || h[:log_filename].to_s.empty?
    err << "The parameter 'delay' is required." if h[:delay].nil? || h[:delay].to_s.empty?
    err << "The parameter 'run_once' is required." if h[:run_once].nil? || h[:run_once].to_s.empty?
    err << "The parameter 'function' is required." if h[:function].nil? || h[:function].to_s.empty?

    err << "The parameter 'log_filename' must be a string." if h[:log_filename] && !h[:log_filename].is_a?(String)
    err << "The parameter 'delay' must be an integer." if h[:delay] && !h[:delay].is_a?(Integer)
    err << "The parameter 'run_once' must be a boolean." if h[:run_once] && ![true, false].include?(h[:run_once])
    err << "The parameter 'function' must be a Proc." if h[:function] && !h[:function].is_a?(Proc)

    log_filename = h[:log_filename]
    delay = h[:delay]
    run_once = h[:run_once]
    function = h[:function]

    l = log_filename.nil? ? BlackStack::DummyLogger.new(nil) : BlackStack::LocalLogger.new(log_filename)

    while true
        # get the start loop time
        l.logs 'Starting loop... '
        start = Time.now()
        l.logf 'done'.green        
    
        begin
            function.call(l)
        # catch general exceptions
        rescue => e
            l.logf "Error: #{e.to_console.red}"
        # CTRL+C will be catched here
        rescue Interrupt => e
            l.logf "Interrupted".red
            exit(0)
        end
    
        l.logs 'Releasing resources... '
        GC.start
        l.logf 'done'.green
        
        if run_once
            l.log "Finished Loop!\n".blue
            exit(0)
        end
    
        # get the end loop time
        l.logs 'Ending loop... '
        finish = Time.now()
        l.logf 'done'.green        
                
        # get different in seconds between start and finish
        # if diff > 30 seconds
        l.logs 'Calculating loop duration... '
        diff = finish - start
        l.logf 'done ('+diff.to_s.blue+')'
    
        l.log "Finished Loop!\n".blue
    
        if diff < delay
            # sleep for 30 seconds
            n = delay-diff
                    
            l.logs 'Sleeping for '+n.to_label.blue+' seconds... '
            sleep n
            l.logf 'done'.green        
        else
            l.log 'No sleeping. The loop took '+diff.to_label.blue+' seconds.'
        end # if diff < delay
    
    end # while (true)            
end

.set_logger(l) ⇒ Object



70
71
72
# File 'lib/pampa.rb', line 70

def self.set_logger(l)
  @@logger = l
end

.set_snippets(h) ⇒ Object



75
76
77
78
# File 'lib/pampa.rb', line 75

def self.set_snippets(h)
  @@dispatcher_function = h[:dispatcher_function] if h.has_key?(:dispatcher_function)
  @@worker_function = h[:worker_function] if h.has_key?(:worker_function)
end

.stretchObject

get attached and unassigned workers. assign and unassign workers to jobs.

Parameters:

  • config: relative path of the configuration file. Example: ‘../config.rb’

  • worker: relative path of the worker.rb file. Example: ‘../worker.rb’



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/pampa.rb', line 96

def self.stretch()
  # getting logger
  l = self.logger()
  # get the job this worker is working with
  BlackStack::Pampa.jobs.each { |job|
    l.logs "job #{job.name}... "
      # get attached and unassigned workers 
      l.logs "Getting attached and unassigned workers... "
      workers = BlackStack::Pampa.workers.select { |w| w.attached && w.assigned_job.nil? }
      l.logf 'done'.green + " (#{workers.size.to_s.blue})"
      # get the workers that match the filter
      l.logs "Getting workers that match the filter... "
      workers = workers.select { |w| w.id =~ job.filter_worker_id }
      l.logf "done".green + " (#{workers.size.to_s.blue})"
      # if theere are workers
      if workers.size > 0
        l.logs("Gettting assigned workers... ") 
        assigned = BlackStack::Pampa.workers.select { |worker| worker.attached && worker.assigned_job.to_s == job.name.to_s }
        l.logf "done ".green + " (#{assigned.size.to_s.blue})"

        l.logs("Getting total pending (pending) tasks... ")
        pendings = job.pending
        l.logf "done".green + " (#{pendings.to_s.blue})"

        l.logs("0 pending tasks?.... ")
        if pendings.size == 0
          l.logf "yes".green

          l.logs("Unassigning all assigned workers... ")
          assigned.each { |w|
            l.logs("Unassigning worker... ")
            w.assigned_job = nil
            workers << w # add worker back to the list of unassigned
            l.logf "done".green + " (#{w.id.to_s.blue})"
          }
          l.logf 'done'.green
        else
          l.logf "no".red

          l.logs("Under :max_pending_tasks (#{job.max_pending_tasks}) and more than 1 assigned workers ?... ")
          if pendings.size < job.max_pending_tasks && assigned.size > 1
            l.logf "yes".green

            while assigned.size > 1
              l.logs("Unassigning worker... ")
              w = assigned.pop # TODO: find a worker with no pending tasks
              w.assigned_job = nil
              workers << w # add worker back to the array of unassigned workers
              l.logf "done".green + " (#{w.id.to_s.blue})"
            end
          else
            l.logf "no".red

            l.logs("Over :max_assigned_workers (#{job.max_assigned_workers.to_s.blue}) and more than 1 assigned workers?... ")
            if assigned.size >= job.max_assigned_workers && assigned.size > 1
              l.logf("yes".green)
            else
              l.logf("no".red)

              i = assigned.size
              while i < job.max_assigned_workers
                i += 1
                l.logs("Assigning worker... ")
                w = workers.pop
                if w.nil?
                  l.logf("no more workers".yellow)
                  break
                else
                  w.assigned_job = job.name.to_sym
                  l.logf "done".green + " (#{w.id.to_s.blue})"
                end
              end # while i < job.max_assigned_workers
            end # if assigned.size >= job.max_assigned_workers && assigned.size > 0
          end # if pendings.size < job.max_pending_tasks && assigned.size > 1
        end # if pendings.size == 0
      end # if workers.size > 0
    l.logf 'done'.green
  }
end

.worker_functionObject



84
85
86
# File 'lib/pampa.rb', line 84

def self.worker_function
  @@worker_function
end

.workersObject

return the array of all workers, beloning all nodes.



40
41
42
# File 'lib/pampa.rb', line 40

def self.workers()
  @@nodes.map { |node| node.workers }.flatten
end