Module: ConcurrentStream
- Defined in:
- lib/rbbt/util/misc/concurrent_stream.rb
Instance Attribute Summary collapse
-
#abort_callback ⇒ Object
Returns the value of attribute abort_callback.
-
#aborted ⇒ Object
Returns the value of attribute aborted.
-
#autojoin ⇒ Object
Returns the value of attribute autojoin.
-
#callback ⇒ Object
Returns the value of attribute callback.
-
#filename ⇒ Object
Returns the value of attribute filename.
-
#joined ⇒ Object
Returns the value of attribute joined.
-
#lockfile ⇒ Object
Returns the value of attribute lockfile.
-
#no_fail ⇒ Object
Returns the value of attribute no_fail.
-
#pair ⇒ Object
Returns the value of attribute pair.
-
#pids ⇒ Object
Returns the value of attribute pids.
-
#stream_exception ⇒ Object
Returns the value of attribute stream_exception.
-
#thread ⇒ Object
Returns the value of attribute thread.
-
#threads ⇒ Object
Returns the value of attribute threads.
Class Method Summary collapse
Instance Method Summary collapse
- #abort(exception = nil) ⇒ Object
- #abort_pids ⇒ Object
- #abort_threads(exception = nil) ⇒ Object
- #aborted? ⇒ Boolean
- #add_callback(&block) ⇒ Object
- #annotate(stream) ⇒ Object
- #clear ⇒ Object
- #close(*args) ⇒ Object
- #join ⇒ Object
- #join_callback ⇒ Object
- #join_pids ⇒ Object
- #join_threads ⇒ Object
- #joined? ⇒ Boolean
- #stream_raise_exception(exception) ⇒ Object
Instance Attribute Details
#abort_callback ⇒ Object
Returns the value of attribute abort_callback.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def abort_callback @abort_callback end |
#aborted ⇒ Object
Returns the value of attribute aborted.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def aborted @aborted end |
#autojoin ⇒ Object
Returns the value of attribute autojoin.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def autojoin @autojoin end |
#callback ⇒ Object
Returns the value of attribute callback.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def callback @callback end |
#filename ⇒ Object
Returns the value of attribute filename.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def filename @filename end |
#joined ⇒ Object
Returns the value of attribute joined.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def joined @joined end |
#lockfile ⇒ Object
Returns the value of attribute lockfile.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def lockfile @lockfile end |
#no_fail ⇒ Object
Returns the value of attribute no_fail.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def no_fail @no_fail end |
#pair ⇒ Object
Returns the value of attribute pair.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def pair @pair end |
#pids ⇒ Object
Returns the value of attribute pids.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def pids @pids end |
#stream_exception ⇒ Object
Returns the value of attribute stream_exception.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def stream_exception @stream_exception end |
#thread ⇒ Object
Returns the value of attribute thread.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def thread @thread end |
#threads ⇒ Object
Returns the value of attribute threads.
10 11 12 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 10 def threads @threads end |
Class Method Details
.setup(stream, options = {}, &block) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 12 def self.setup(stream, = {}, &block) threads, pids, callback, abort_callback, filename, autojoin, lockfile, no_fail, pair = Misc. , :threads, :pids, :callback, :abort_callback, :filename, :autojoin, :lockfile, :no_fail, :pair stream.extend ConcurrentStream unless ConcurrentStream === stream stream.threads ||= [] stream.pids ||= [] stream.threads.concat(Array === threads ? threads : [threads]) unless threads.nil? stream.pids.concat(Array === pids ? pids : [pids]) unless pids.nil? or pids.empty? stream.autojoin = autojoin unless autojoin.nil? stream.no_fail = no_fail unless no_fail.nil? stream.pair = pair unless pair.nil? callback = block if block_given? if callback if stream.callback old_callback = stream.callback stream.callback = Proc.new do old_callback.call callback.call end else stream.callback = callback end end if abort_callback if stream.abort_callback old_abort_callback = stream.abort_callback stream.abort_callback = Proc.new do old_abort_callback.call abort_callback.call end else stream.abort_callback = abort_callback end end stream.filename = filename unless filename.nil? stream.lockfile = lockfile unless lockfile.nil? stream.aborted = false stream end |
Instance Method Details
#abort(exception = nil) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 174 def abort(exception = nil) if @aborted Log.medium "Already aborted stream #{Misc.fingerprint self} [#{@aborted}]" return else Log.medium "Aborting stream #{Misc.fingerprint self} [#{@aborted}]" end AbortedStream.setup(self, exception) @aborted = true begin close unless closed? @abort_callback.call exception if @abort_callback abort_threads(exception) abort_pids @callback = nil @abort_callback = nil @pair.abort exception if @pair ensure if lockfile and lockfile.locked? lockfile.unlock end end end |
#abort_pids ⇒ Object
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 163 def abort_pids @pids.each do |pid| begin Log.low "Killing PID #{pid} in ConcurrentStream #{filename}" Process.kill :INT, pid rescue Errno::ESRCH end end if @pids @pids = [] end |
#abort_threads(exception = nil) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 135 def abort_threads(exception = nil) return unless @threads and @threads.any? name = Thread.current.inspect name = filename if filename Log.low "Aborting threads (#{name}) #{@threads.collect{|t| t.inspect } * ", "}" @threads.each do |t| next if t == Thread.current Log.debug "Aborting thread (#{name}) #{t.inspect} with exception: #{exception}" t.raise((exception.nil? ? Aborted.new : exception)) end @threads.each do |t| next if t == Thread.current if t.alive? sleep 1 Log.low "Kill thread (#{name}) #{t.inspect}" t.kill end begin t.join unless t == Thread.current rescue Aborted rescue Exception Log.debug "Thread (#{name}) exception: #{$!.}" end end end |
#aborted? ⇒ Boolean
73 74 75 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 73 def aborted? @aborted end |
#add_callback(&block) ⇒ Object
219 220 221 222 223 224 225 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 219 def add_callback(&block) old_callback = callback @callback = Proc.new do old_callback.call if old_callback block.call end end |
#annotate(stream) ⇒ Object
60 61 62 63 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 60 def annotate(stream) ConcurrentStream.setup(stream, :threads => threads, :pids => pids, :callback => callback, :abort_callback => abort_callback, :filename => filename, :autojoin => autojoin, :lockfile => lockfile) stream end |
#clear ⇒ Object
65 66 67 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 65 def clear threads, pids, callback, abort_callback, joined = nil end |
#close(*args) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 202 def close(*args) if autojoin begin super(*args) rescue Log.exception $! self.abort self.join stream_raise_exception $! ensure self.join if self.closed? or self.eof? end else super(*args) end end |
#join ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 122 def join begin join_threads join_pids join_callback close unless closed? ensure @joined = true lockfile.unlock if lockfile and lockfile.locked? raise stream_exception if stream_exception end end |
#join_callback ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 112 def join_callback if @callback and not joined? begin @callback.call ensure @callback = nil end end end |
#join_pids ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 99 def join_pids if @pids and @pids.any? @pids.each do |pid| begin Process.waitpid(pid, Process::WUNTRACED) stream_raise_exception ProcessFailed.new "Error joining process #{pid} in #{self.filename || self.inspect}" unless $?.success? or no_fail rescue Errno::ECHILD end end @pids = [] end end |
#join_threads ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 77 def join_threads if @threads and @threads.any? @threads.each do |t| next if t == Thread.current begin t.join if Process::Status === t.value raise ProcessFailed.new "Error joining process #{t.pid} in #{self.filename || self.inspect}" if ! (t.value.success? || no_fail) end rescue Exception if no_fail Log.low "Not failing on exception joining thread in ConcurrenStream: #{filename}" else Log.low "Exception joining thread in ConcurrenStream: #{filename}" stream_raise_exception $! end end end end @threads = [] end |
#joined? ⇒ Boolean
69 70 71 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 69 def joined? @joined end |
#stream_raise_exception(exception) ⇒ Object
227 228 229 230 231 232 233 234 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 227 def stream_raise_exception(exception) threads.each do |thread| thread.raise exception end self.stream_exception = exception self.abort end |