Class: RQ::Toucher

Inherits:
MainHelper show all
Defined in:
lib/rq/toucher.rb

Overview

nodoc

Constant Summary

Constants included from Logging

Logging::DIV0, Logging::DIV1, Logging::DIV2, Logging::DIV3, Logging::EOL, Logging::SEC0, Logging::SEC1, Logging::SEC2, Logging::SEC3

Instance Attribute Summary

Attributes inherited from MainHelper

#argv, #cmd, #dot_rq_dir, #env, #fields, #job_stdin, #loops, #main, #mode, #options, #program, #q, #qpath, #quiet, #stdin

Instance Method Summary collapse

Methods inherited from MainHelper

#dumping_yaml_tuples, #field_match, #init_job_stdin!, #initialize, #loadio, #loadyaml, #set_q

Methods included from Logging

append_features

Methods included from Logging::LogMethods

#debug, #error, #fatal, #info, #logerr, #logger, #logger=, #warn

Methods included from Util

#alive?, append_features, #btrace, #columnize, #defval, #emsg, #erreq, #errmsg, #escape, #escape!, #exec, export, #fork, #getopt, #hashify, #hms, #host, #hostname, #klass, #maim, #mcp, #realpath, #stamptime, #system, #timestamp, #tmpnam, #uncache, #which_ruby

Constructor Details

This class inherits a constructor from RQ::MainHelper

Instance Method Details

#touchObject

–{{{



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/rq/toucher.rb', line 15

def touch
#--{{{
  set_q

  @priority = @options['priority']
  debug{ "priority <#{ @priority }>" }

  @tag = @options['tag']
  debug{ "tag <#{ @tag }>" }

  @runner = @options['runner']
  debug{ "runner <#{ @runner }>" }

  @restartable = @options['restartable']
  debug{ "restartable <#{ @restartable }>" }

  @infile = @options['infile'] 
  debug{ "infile <#{ @infile }>" }

  @job_stdin = @options['stdin'] 
  debug{ "job_stdin <#{ @job_stdin }>" }

  @stage = @options['stage']
  debug{ "stage <#{ @stage }>" }

  @data = @options['data'] 
  debug{ "data <#{ @data }>" }

  if job_stdin == '-' and stdin?
    abort "cannot specify both jobs and job input on stdin"
  end

  jobs = [] 

  unless @argv.empty?
    job = Job::new
    job['command'] = @argv.join(' ') 
    job['priority'] = @priority
    job['tag'] = @tag 
    job['runner'] = @runner
    job['restartable'] = @restartable
    job['data'] = @data
    jobs << job
  end

  if @infile
    open(@infile) do |f|
      debug{ "reading jobs from <#{ @infile }>" }
      loadio f, @infile, jobs 
    end
  end

  if jobs.empty? and stdin? 
    debug{ "reading jobs from <stdin>" }
    loadio stdin, 'stdin', jobs 
  end

  abort "no jobs specified!" if jobs.empty?

  init_job_stdin!
  
  state = @stage ? 'holding' : 'pending'

  jobs.each do |job|
    job['state'] = state
    job['priority'] = @priority if @options.has_key?('priority')
    job['tag'] = @tag if @options.has_key?('tag')
    job['runner'] = @runner if @options.has_key?('runner')
    job['restartable'] = @restartable if @options.has_key?('restartable')
    job['stdin'] = @job_stdin if @job_stdin
    job['data'] = @data if @data
  end

#
# state + lambdas for submit process...
#

  list = []

  tmpfile =
    lambda do |basename|
      basename = File.basename basename.to_s
      Tempfile.new "#{ basename }_#{ Process.pid }_#{ rand.to_s }"
    end

  update_job = 
    lambda do |pjob, ujob|
      kvs, jid = {}, pjob['jid']
    # handle stdin
      pstdin, ustdin = pjob['stdin'], ujob['stdin']
      if pstdin || ustdin
        pbuf =
          if pstdin
            pstdin = @q.standard_in_4 jid
            IO.read pstdin if test ?e, pstdin
          end
        ubuf =
          if ustdin
            IO.read ustdin if test ?e, ustdin
          end
#y 'pbuf' => pbuf
#y 'ubuf' => ubuf
        f = ustdin ? open(ustdin,'w') : tmpfile[ustdin]
        begin
          f.write pbuf if pbuf
          f.write ubuf if pbuf
        ensure
          f.close
        end
        kvs['stdin'] = ujob['stdin'] = f.path
#y 'stdin' => ujob['stdin']
      end
    # handle other keys
      allowed = %w( priority runner restartable )
      allowed.each do |key|
        val = ujob[key]
        kvs[key] = val if val
      end
      @q.update(kvs, jid){|updated| list << updated}
    end

  submit_job = 
    lambda do |job|
      @q.submit(job){|| list << }
    end


#
# update or submit
#
  @q.transaction do
    pending = @q.list 'pending'

    pjobs, pcommands = {}, {}

    pending.each do |job|
      jid = job['jid']
      command = job['command'].strip
      tag = job['tag'].to_s.strip
      pjobs[jid] = job
      pcommands[[command, tag]] = jid
    end

    jobs.each do |job|
      jid = job['jid']
      command = job['command'].strip
      tag = job['tag'].to_s.strip
      if((jid = pcommands[[command, tag]]))
        update_job[ pjobs[jid], job ] 
      else
        submit_job[ job ]
      end
    end
  end

  list.each &dumping_yaml_tuples unless @options['quiet']
    
  jobs = nil
  list = nil
  self
#--}}}
end