Class: BandManager
- Inherits:
-
Object
- Object
- BandManager
- Defined in:
- lib/manband/bandmanager.rb
Overview
This class is used to launch new workflows
Constant Summary collapse
- @@log =
Logger.new(STDOUT)
Class Method Summary collapse
-
.add(wfile, var = [], uid = 'admin', bucket = 'manband', debug = false) ⇒ Object
Add a new workflow based on input workflow object wfile: workflow input file var: optional runtime variables uid: optional user id debug: activate debug mode, do not execute the commands.
-
.launch(wfile, var = [], uid = 'admin', bucket = 'manband', debug = false) ⇒ Object
Launch a new workflow based on input workflow object wfile: workflow input file.
-
.launchclone(id) ⇒ Object
Execute a workflow from an other one (clone it).
-
.load(wfile) ⇒ Object
Loads a workflow file.
-
.start(id, debug = false) ⇒ Object
Start a workflow in state NEW In debug mode sends a OP_SKIP instead of OP_START.
Class Method Details
.add(wfile, var = [], uid = 'admin', bucket = 'manband', debug = false) ⇒ Object
Add a new workflow based on input workflow object wfile: workflow input file var: optional runtime variables uid: optional user id debug: activate debug mode, do not execute the commands
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/manband/bandmanager.rb', line 40 def self.add(wfile, var = [], uid='admin', bucket='manband' , debug=false) rvariables = Hash.new #fworkflow = YAML.load_file(wfile) fworkflow = self.load(wfile) if fworkflow==nil return nil end terminals = 0 fworkflow["workflow"].each do |node| if node[1]["command"]!=nil # First init with vars defined in workflow exprs = node[1]["command"].scan(/#var\.(.*?)#/) if exprs.length > 0 for reg in 0..exprs.length-1 rvariables[exprs[reg][0]] = nil end end if node[1]["next"]==nil # This is a terminal node terminals += 1 @@log.debug "terminal: "+node[0] end end end @@log.debug "nb terminals: "+terminals.to_s if var!=nil var.each do |rvariable| rvardef = rvariable.split('=') rvariables[rvardef[0]]=rvardef[1] end @@log.debug "Using runtime variables: "+rvariables.to_json end workflow = WorkFlow.new(:uid => uid , :name => fworkflow["workflow"]["name"], :description => fworkflow["workflow"]["description"], :created_at => Time.now, :file => wfile, :terminals => terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => rvariables.to_json, :bucket => bucket) err =workflow.save if err == false return nil end instances = 0 regexp = nil # Check if workflow need to be run for many files if fworkflow["workflow"]["root"]["url"]!=nil nodepath = fworkflow["workflow"]["root"]["url"] if fworkflow["workflow"]["root"]["regexp"]!=nil regexp = Regexp.new(fworkflow["workflow"]["root"]["regexp"]) end filelist = Array.new Dir.new(nodepath).entries.each do |n| @@log.debug "Test file "+n if (regexp==nil || regexp.match(n)) && File.file?(nodepath+"/"+n) @@log.debug "New sub workflow" instances += 1 subworkflow = WorkFlow.new(:uid => uid , :name => fworkflow["workflow"]["name"], :description => fworkflow["workflow"]["description"], :created_at => Time.now, :file => wfile, :terminals => terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => rvariables.to_json, :bucket => bucket, :parent => workflow.id) err = subworkflow.save if !File.exists?(nodepath+"/root") FileUtils.mkdir_p subworkflow.workdir+"/root" end File.symlink(nodepath+"/"+n,subworkflow.workdir+"/root/"+n) @@log.debug("Add new sub workflow "+subworkflow.id.to_s) end end end @@log.debug "Number of instances: "+instances.to_s workflow.update(:instances => instances) return workflow.id end |
.launch(wfile, var = [], uid = 'admin', bucket = 'manband', debug = false) ⇒ Object
Launch a new workflow based on input workflow object wfile: workflow input file
uid: optional user id debug: activate debug mode, do not execute the commands
168 169 170 171 172 173 174 175 176 |
# File 'lib/manband/bandmanager.rb', line 168 def self.launch(wfile, var = [], uid='admin', bucket='manband' , debug=false) workflow = self.add(wfile,var,uid,bucket,debug) if workflow==nil return nil end @@log.debug "Start workflow "+workflow.to_s self.start(workflow,debug) return workflow end |
.launchclone(id) ⇒ Object
Execute a workflow from an other one (clone it)
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/manband/bandmanager.rb', line 146 def self.launchclone(id) workflow = WorkFlow.get(id) newworkflow = WorkFlow.new(:uid => workflow.uid , :name => workflow.name, :description => workflow.description, :created_at => Time.now, :file => workflow.file, :terminals => workflow.terminals, :status => STATUS_NEW, :workdir => FlowConfig.getjobdir(), :vars => workflow.vars, :bucket => workflow.bucket) newworkflow.save rootjob = Job.new(:wid => newworkflow.id, :node => "root", :command => "", :status => STATUS_NEW, :instances => 0, :maxinstances => 0, :workdir => '') rootjob.save newworkflow.parse('root',rootjob.id) jobmsg = '{ "id" : "'+newworkflow.id.to_s+'", "root" : "'+rootjob.id.to_s+'"}' job = Job.new(:wid => newworkflow.id, :node => "fake", :command => "", :status => STATUS_FAKE, :instances => 0, :workdir => '') Utils.publish('manband.master', { "operation" => OP_START, "msg" => jobmsg }) job.logMessage(OP_START,jobmsg) return newworkflow.id end |
.load(wfile) ⇒ Object
Loads a workflow file
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/manband/bandmanager.rb', line 20 def self.load(wfile) if !File.exists?(wfile) @@log.error "Workflow file "+wfile+" does not exist!" return nil end begin workflow = YAML.load_file(wfile) rescue @@log.error "Error while load file" return nil end return workflow end |
.start(id, debug = false) ⇒ Object
Start a workflow in state NEW In debug mode sends a OP_SKIP instead of OP_START
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/manband/bandmanager.rb', line 116 def self.start(id,debug=false) workflow = WorkFlow.get(id) wlist = Array.new wflows = WorkFlow.all(:parent => id) if wflows == nil || wflows.length==0 wlist.push(id) else wflows.each do |wflow| wlist.push wflow.id end end @@log.debug "Execute workflow list: "+wlist.to_s wlist.each do |wflow| workflow = WorkFlow.get(wflow) rootjob = Job.new(:wid => workflow.id, :node => "root", :command => "", :status => STATUS_NEW, :instances => 0, :maxinstances => 0, :workdir => '') rootjob.save workflow.parse('root',rootjob.id) # Request workflow management msg = '{ "id" : "'+workflow.id.to_s+'", "root" : "'+rootjob.id.to_s+'"}' if debug msg = '{ "id" : "'+rootjob.id.to_s+'","workflow" : "'+workflow.id.to_s+'", "node" : "'+rootjob.id.to_s+'"}' Utils.publish("manband.master", { "operation" => OP_SKIP, "msg" => msg }) else Utils.publish("manband.master", { "operation" => OP_START, "msg" => msg }) end end end |