Class: RQ::JobRunnerDaemon

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/rq/jobrunnerdaemon.rb

Overview

as stated in the description of the JobRunner class, the JobRunnerDaemon is a helper daemon that runs as a drb object. it’s primary responsibilty is simply for enable forks to occur in a a different address space that the one doing the sqlite transaction. in addition to forking to create child processes in which to run jobs, the JobRunnerDaemon daemon also provides facilities to wait for these children

Constant Summary

Constants included from Logging

Logging::DIV0, Logging::DIV1, Logging::DIV2, Logging::DIV3, Logging::EOL, Logging::SEC0, Logging::SEC1, Logging::SEC2, Logging::SEC3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

append_features

Methods included from Logging::LogMethods

#debug, #error, #fatal, #info, #logerr, #logger, #logger=, #warn

Constructor Details

#initialize(q) ⇒ JobRunnerDaemon

Returns a new instance of JobRunnerDaemon.



109
110
111
112
113
114
115
116
# File 'lib/rq/jobrunnerdaemon.rb', line 109

def initialize q
#--{{{
  @q = q
  @runners = {}
  @uri = nil
  @pid = Process::pid 
#--}}}
end

Instance Attribute Details

#pidObject

Returns the value of attribute pid.



107
108
109
# File 'lib/rq/jobrunnerdaemon.rb', line 107

def pid
  @pid
end

#qObject (readonly)

–}}}



105
106
107
# File 'lib/rq/jobrunnerdaemon.rb', line 105

def q
  @q
end

#runnersObject (readonly)

Returns the value of attribute runners.



106
107
108
# File 'lib/rq/jobrunnerdaemon.rb', line 106

def runners
  @runners
end

#uriObject

Returns the value of attribute uri.



108
109
110
# File 'lib/rq/jobrunnerdaemon.rb', line 108

def uri
  @uri
end

Class Method Details

.daemon(*a, &b) ⇒ Object

–{{{



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rq/jobrunnerdaemon.rb', line 29

def daemon(*a,&b)
#--{{{
  jrd = new(*a, &b) 

  r, w = IO::pipe

  unless((pid = fork)) # child
    $0 = "#{ self }".gsub(%r/[^a-zA-Z]+/,'_').downcase
    begin
      r.close
      n = 0
      uri = nil
      socket = nil

      42.times do
        begin
          s = "%s/%s_%s_%s_%s" %
            [Dir::tmpdir, File::basename($0), Process::ppid, n, rand(42)]
          u = "drbunix://#{ s }"
          DRb::start_service u, jrd 
          socket = s
          uri = u
          break
        rescue Errno::EADDRINUSE
          n += 1
        end
      end

      if socket and uri
        w.write socket 
        w.close
        pid = Process::pid
        ppid = Process::ppid
        cur = Thread::current
        Thread::new(pid, ppid, cur) do |pid, ppid, cur|
          loop do
            begin
              Process::kill 0, ppid
              sleep 42
            rescue
              cur.raise "parent <#{ ppid }> died unexpectedly" 
            end
          end
        end
        DRb::thread.join
      else
        w.close
      end
    ensure
      exit!
    end
  else # parent
    w.close
    socket = r.read
    r.close

    if socket and File::exist?(socket)
      at_exit{ FileUtils::rm_f socket }
      uri = "drbunix://#{ socket }"
    #
    # starting this on localhost avoids dns lookups!
    #
      DRb::start_service 'druby://localhost:0', nil
      jrd = DRbObject::new nil, uri
      jrd.pid = pid
      jrd.uri = uri
    else
      raise "failed to start job runner daemon"
    end
  end

  return jrd
#--}}}
end

Instance Method Details

#install_signal_handlersObject

–}}}



183
184
185
186
187
# File 'lib/rq/jobrunnerdaemon.rb', line 183

def install_signal_handlers
#--{{{
  %w(TERM INT HUP).each{|sig| trap sig, 'SIG_IGN'}
#--}}}
end

#runner(job) ⇒ Object

–}}}



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rq/jobrunnerdaemon.rb', line 117

def runner job 
#--{{{
  r = nil
  retried = false
  begin
    r = JobRunner::new @q, job
  rescue Errno::ENOMEM, Errno::EAGAIN
    GC::start
    unless retried
      retried = true 
      retry
    else
      raise
    end
  end
  @runners[r.pid] = r
  r
#--}}}
end

#shutdownObject

–}}}



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rq/jobrunnerdaemon.rb', line 166

def shutdown
#--{{{
  @death =
    Thread::new do
      begin
        while not @runners.empty?
          pid = Process::wait 
          @runners.delete pid
        end
      ensure
        #sleep 4.2
        DRb::thread.kill
        Thread::main exit!
      end
    end
#--}}}
end

#waitObject

–}}}



136
137
138
139
140
141
142
# File 'lib/rq/jobrunnerdaemon.rb', line 136

def wait
#--{{{
  pid = Process::wait
  @runners.delete pid
  pid
#--}}}
end

#wait2Object

–}}}



143
144
145
146
147
148
149
# File 'lib/rq/jobrunnerdaemon.rb', line 143

def wait2
#--{{{
  pid, status = Process::wait2
  @runners.delete pid
  [pid, status]
#--}}}
end

#waitpid(pid = -1,, flags = 0) ⇒ Object

–}}}



150
151
152
153
154
155
156
157
# File 'lib/rq/jobrunnerdaemon.rb', line 150

def waitpid pid = -1, flags = 0 
#--{{{
  pid = pid.pid if pid.respond_to? 'pid'
  pid = Process::waitpid pid, flags 
  @runners.delete pid
  pid
#--}}}
end

#waitpid2(pid = -1,, flags = 0) ⇒ Object

–}}}



158
159
160
161
162
163
164
165
# File 'lib/rq/jobrunnerdaemon.rb', line 158

def waitpid2 pid = -1, flags = 0 
#--{{{
  pid = pid.pid if pid.respond_to? 'pid'
  pid, status = Process::waitpid2 pid, flags 
  @runners.delete pid
  [pid, status]
#--}}}
end