Class: RQ::Toucher
- Inherits:
-
MainHelper
- Object
- MainHelper
- RQ::Toucher
- Defined in:
- lib/rq/toucher.rb
Overview
nodoc
Constant Summary
Constants included from Logging
Logging::DIV0, Logging::DIV1, Logging::DIV2, Logging::DIV3, Logging::EOL, Logging::SEC0, Logging::SEC1, Logging::SEC2, Logging::SEC3
Instance Attribute Summary
Attributes inherited from MainHelper
#argv, #cmd, #dot_rq_dir, #env, #fields, #job_stdin, #loops, #main, #mode, #options, #program, #q, #qpath, #quiet, #stdin
Instance Method Summary collapse
-
#touch ⇒ Object
–{{{.
Methods inherited from MainHelper
#dumping_yaml_tuples, #field_match, #init_job_stdin!, #initialize, #loadio, #loadyaml, #set_q
Methods included from Logging
Methods included from Logging::LogMethods
#debug, #error, #fatal, #info, #logerr, #logger, #logger=, #warn
Methods included from Util
#alive?, append_features, #btrace, #columnize, #defval, #emsg, #erreq, #errmsg, #escape, #escape!, #exec, export, #fork, #getopt, #hashify, #hms, #host, #hostname, #klass, #maim, #mcp, #realpath, #stamptime, #system, #timestamp, #tmpnam, #uncache, #which_ruby
Constructor Details
This class inherits a constructor from RQ::MainHelper
Instance Method Details
#touch ⇒ Object
–{{{
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/rq/toucher.rb', line 15 def touch #--{{{ set_q @priority = @options['priority'] debug{ "priority <#{ @priority }>" } @tag = @options['tag'] debug{ "tag <#{ @tag }>" } @runner = @options['runner'] debug{ "runner <#{ @runner }>" } @restartable = @options['restartable'] debug{ "restartable <#{ @restartable }>" } @infile = @options['infile'] debug{ "infile <#{ @infile }>" } @job_stdin = @options['stdin'] debug{ "job_stdin <#{ @job_stdin }>" } @stage = @options['stage'] debug{ "stage <#{ @stage }>" } @data = @options['data'] debug{ "data <#{ @data }>" } if job_stdin == '-' and stdin? abort "cannot specify both jobs and job input on stdin" end jobs = [] unless @argv.empty? job = Job::new job['command'] = @argv.join(' ') job['priority'] = @priority job['tag'] = @tag job['runner'] = @runner job['restartable'] = @restartable job['data'] = @data jobs << job end if @infile open(@infile) do |f| debug{ "reading jobs from <#{ @infile }>" } loadio f, @infile, jobs end end if jobs.empty? and stdin? debug{ "reading jobs from <stdin>" } loadio stdin, 'stdin', jobs end abort "no jobs specified!" if jobs.empty? init_job_stdin! state = @stage ? 'holding' : 'pending' jobs.each do |job| job['state'] = state job['priority'] = @priority if @options.has_key?('priority') job['tag'] = @tag if @options.has_key?('tag') job['runner'] = @runner if @options.has_key?('runner') job['restartable'] = @restartable if @options.has_key?('restartable') job['stdin'] = @job_stdin if @job_stdin job['data'] = @data if @data end # # state + lambdas for submit process... # list = [] tmpfile = lambda do |basename| basename = File.basename basename.to_s Tempfile.new "#{ basename }_#{ Process.pid }_#{ rand.to_s }" end update_job = lambda do |pjob, ujob| kvs, jid = {}, pjob['jid'] # handle stdin pstdin, ustdin = pjob['stdin'], ujob['stdin'] if pstdin || ustdin pbuf = if pstdin pstdin = @q.standard_in_4 jid IO.read pstdin if test ?e, pstdin end ubuf = if ustdin IO.read ustdin if test ?e, ustdin end #y 'pbuf' => pbuf #y 'ubuf' => ubuf f = ustdin ? open(ustdin,'w') : tmpfile[ustdin] begin f.write pbuf if pbuf f.write ubuf if pbuf ensure f.close end kvs['stdin'] = ujob['stdin'] = f.path #y 'stdin' => ujob['stdin'] end # handle other keys allowed = %w( priority runner restartable ) allowed.each do |key| val = ujob[key] kvs[key] = val if val end @q.update(kvs, jid){|updated| list << updated} end submit_job = lambda do |job| @q.submit(job){|submitted| list << submitted} end # # update or submit # @q.transaction do pending = @q.list 'pending' pjobs, pcommands = {}, {} pending.each do |job| jid = job['jid'] command = job['command'].strip tag = job['tag'].to_s.strip pjobs[jid] = job pcommands[[command, tag]] = jid end jobs.each do |job| jid = job['jid'] command = job['command'].strip tag = job['tag'].to_s.strip if((jid = pcommands[[command, tag]])) update_job[ pjobs[jid], job ] else submit_job[ job ] end end end list.each &dumping_yaml_tuples unless @options['quiet'] jobs = nil list = nil self #--}}} end |