Class: RQ::JobRunnerDaemon
- Inherits:
-
Object
- Object
- RQ::JobRunnerDaemon
- Includes:
- Logging
- Defined in:
- lib/rq/jobrunnerdaemon.rb
Overview
as stated in the description of the JobRunner class, the JobRunnerDaemon is a helper daemon that runs as a drb object. it’s primary responsibilty is simply for enable forks to occur in a a different address space that the one doing the sqlite transaction. in addition to forking to create child processes in which to run jobs, the JobRunnerDaemon daemon also provides facilities to wait for these children
Constant Summary
Constants included from Logging
Logging::DIV0, Logging::DIV1, Logging::DIV2, Logging::DIV3, Logging::EOL, Logging::SEC0, Logging::SEC1, Logging::SEC2, Logging::SEC3
Instance Attribute Summary collapse
-
#pid ⇒ Object
Returns the value of attribute pid.
-
#q ⇒ Object
readonly
–}}}.
-
#runners ⇒ Object
readonly
Returns the value of attribute runners.
-
#uri ⇒ Object
Returns the value of attribute uri.
Class Method Summary collapse
-
.daemon(*a, &b) ⇒ Object
–{{{.
Instance Method Summary collapse
-
#initialize(q) ⇒ JobRunnerDaemon
constructor
A new instance of JobRunnerDaemon.
-
#install_signal_handlers ⇒ Object
–}}}.
-
#runner(job) ⇒ Object
–}}}.
-
#shutdown ⇒ Object
–}}}.
-
#wait ⇒ Object
–}}}.
-
#wait2 ⇒ Object
–}}}.
-
#waitpid(pid = -1,, flags = 0) ⇒ Object
–}}}.
-
#waitpid2(pid = -1,, flags = 0) ⇒ Object
–}}}.
Methods included from Logging
Methods included from Logging::LogMethods
#debug, #error, #fatal, #info, #logerr, #logger, #logger=, #warn
Constructor Details
#initialize(q) ⇒ JobRunnerDaemon
Returns a new instance of JobRunnerDaemon.
109 110 111 112 113 114 115 116 |
# File 'lib/rq/jobrunnerdaemon.rb', line 109 def initialize q #--{{{ @q = q @runners = {} @uri = nil @pid = Process::pid #--}}} end |
Instance Attribute Details
#pid ⇒ Object
Returns the value of attribute pid.
107 108 109 |
# File 'lib/rq/jobrunnerdaemon.rb', line 107 def pid @pid end |
#q ⇒ Object (readonly)
–}}}
105 106 107 |
# File 'lib/rq/jobrunnerdaemon.rb', line 105 def q @q end |
#runners ⇒ Object (readonly)
Returns the value of attribute runners.
106 107 108 |
# File 'lib/rq/jobrunnerdaemon.rb', line 106 def runners @runners end |
#uri ⇒ Object
Returns the value of attribute uri.
108 109 110 |
# File 'lib/rq/jobrunnerdaemon.rb', line 108 def uri @uri end |
Class Method Details
.daemon(*a, &b) ⇒ Object
–{{{
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/rq/jobrunnerdaemon.rb', line 29 def daemon(*a,&b) #--{{{ jrd = new(*a, &b) r, w = IO::pipe unless((pid = fork)) # child $0 = "#{ self }".gsub(%r/[^a-zA-Z]+/,'_').downcase begin r.close n = 0 uri = nil socket = nil 42.times do begin s = "%s/%s_%s_%s_%s" % [Dir::tmpdir, File::basename($0), Process::ppid, n, rand(42)] u = "drbunix://#{ s }" DRb::start_service u, jrd socket = s uri = u break rescue Errno::EADDRINUSE n += 1 end end if socket and uri w.write socket w.close pid = Process::pid ppid = Process::ppid cur = Thread::current Thread::new(pid, ppid, cur) do |pid, ppid, cur| loop do begin Process::kill 0, ppid sleep 42 rescue cur.raise "parent <#{ ppid }> died unexpectedly" end end end DRb::thread.join else w.close end ensure exit! end else # parent w.close socket = r.read r.close if socket and File::exist?(socket) at_exit{ FileUtils::rm_f socket } uri = "drbunix://#{ socket }" # # starting this on localhost avoids dns lookups! # DRb::start_service 'druby://localhost:0', nil jrd = DRbObject::new nil, uri jrd.pid = pid jrd.uri = uri else raise "failed to start job runner daemon" end end return jrd #--}}} end |
Instance Method Details
#install_signal_handlers ⇒ Object
–}}}
183 184 185 186 187 |
# File 'lib/rq/jobrunnerdaemon.rb', line 183 def install_signal_handlers #--{{{ %w(TERM INT HUP).each{|sig| trap sig, 'SIG_IGN'} #--}}} end |
#runner(job) ⇒ Object
–}}}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rq/jobrunnerdaemon.rb', line 117 def runner job #--{{{ r = nil retried = false begin r = JobRunner::new @q, job rescue Errno::ENOMEM, Errno::EAGAIN GC::start unless retried retried = true retry else raise end end @runners[r.pid] = r r #--}}} end |
#shutdown ⇒ Object
–}}}
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rq/jobrunnerdaemon.rb', line 166 def shutdown #--{{{ @death = Thread::new do begin while not @runners.empty? pid = Process::wait @runners.delete pid end ensure #sleep 4.2 DRb::thread.kill Thread::main exit! end end #--}}} end |
#wait ⇒ Object
–}}}
136 137 138 139 140 141 142 |
# File 'lib/rq/jobrunnerdaemon.rb', line 136 def wait #--{{{ pid = Process::wait @runners.delete pid pid #--}}} end |
#wait2 ⇒ Object
–}}}
143 144 145 146 147 148 149 |
# File 'lib/rq/jobrunnerdaemon.rb', line 143 def wait2 #--{{{ pid, status = Process::wait2 @runners.delete pid [pid, status] #--}}} end |
#waitpid(pid = -1,, flags = 0) ⇒ Object
–}}}
150 151 152 153 154 155 156 157 |
# File 'lib/rq/jobrunnerdaemon.rb', line 150 def waitpid pid = -1, flags = 0 #--{{{ pid = pid.pid if pid.respond_to? 'pid' pid = Process::waitpid pid, flags @runners.delete pid pid #--}}} end |
#waitpid2(pid = -1,, flags = 0) ⇒ Object
–}}}
158 159 160 161 162 163 164 165 |
# File 'lib/rq/jobrunnerdaemon.rb', line 158 def waitpid2 pid = -1, flags = 0 #--{{{ pid = pid.pid if pid.respond_to? 'pid' pid, status = Process::waitpid2 pid, flags @runners.delete pid [pid, status] #--}}} end |