Module: ConcurrentStream
- Defined in:
- lib/scout/concurrent_stream.rb
Instance Attribute Summary collapse
-
#abort_callback ⇒ Object
Returns the value of attribute abort_callback.
-
#aborted ⇒ Object
Returns the value of attribute aborted.
-
#autojoin ⇒ Object
Returns the value of attribute autojoin.
-
#callback ⇒ Object
Returns the value of attribute callback.
-
#filename ⇒ Object
Returns the value of attribute filename.
-
#joined ⇒ Object
Returns the value of attribute joined.
-
#lock ⇒ Object
Returns the value of attribute lock.
-
#log ⇒ Object
Returns the value of attribute log.
-
#next ⇒ Object
Returns the value of attribute next.
-
#no_fail ⇒ Object
Returns the value of attribute no_fail.
-
#pair ⇒ Object
Returns the value of attribute pair.
-
#pids ⇒ Object
Returns the value of attribute pids.
-
#std_err ⇒ Object
Returns the value of attribute std_err.
-
#stream_exception ⇒ Object
Returns the value of attribute stream_exception.
-
#thread ⇒ Object
Returns the value of attribute thread.
-
#threads ⇒ Object
Returns the value of attribute threads.
Class Method Summary collapse
- .process_stream(stream, close: true, join: true, message: "process_stream", **kwargs, &block) ⇒ Object
- .setup(stream, options = {}, &block) ⇒ Object
Instance Method Summary collapse
- #abort(exception = nil) ⇒ Object
- #abort_pids ⇒ Object
- #abort_threads(exception = nil) ⇒ Object
- #aborted? ⇒ Boolean
- #add_callback(&block) ⇒ Object
- #annotate(stream) ⇒ Object
- #clear ⇒ Object
- #close(*args) ⇒ Object
- #join ⇒ Object
- #join_callback ⇒ Object
- #join_pids ⇒ Object
- #join_threads ⇒ Object
- #joined? ⇒ Boolean
- #read(*args) ⇒ Object
- #stream_raise_exception(exception) ⇒ Object
Instance Attribute Details
#abort_callback ⇒ Object
Returns the value of attribute abort_callback.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def abort_callback @abort_callback end |
#aborted ⇒ Object
Returns the value of attribute aborted.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def aborted @aborted end |
#autojoin ⇒ Object
Returns the value of attribute autojoin.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def autojoin @autojoin end |
#callback ⇒ Object
Returns the value of attribute callback.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def callback @callback end |
#filename ⇒ Object
Returns the value of attribute filename.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def filename @filename end |
#joined ⇒ Object
Returns the value of attribute joined.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def joined @joined end |
#lock ⇒ Object
Returns the value of attribute lock.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def lock @lock end |
#log ⇒ Object
Returns the value of attribute log.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def log @log end |
#next ⇒ Object
Returns the value of attribute next.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def next @next end |
#no_fail ⇒ Object
Returns the value of attribute no_fail.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def no_fail @no_fail end |
#pair ⇒ Object
Returns the value of attribute pair.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def pair @pair end |
#pids ⇒ Object
Returns the value of attribute pids.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def pids @pids end |
#std_err ⇒ Object
Returns the value of attribute std_err.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def std_err @std_err end |
#stream_exception ⇒ Object
Returns the value of attribute stream_exception.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def stream_exception @stream_exception end |
#thread ⇒ Object
Returns the value of attribute thread.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def thread @thread end |
#threads ⇒ Object
Returns the value of attribute threads.
12 13 14 |
# File 'lib/scout/concurrent_stream.rb', line 12 def threads @threads end |
Class Method Details
.process_stream(stream, close: true, join: true, message: "process_stream", **kwargs, &block) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/scout/concurrent_stream.rb', line 272 def self.process_stream(stream, close: true, join: true, message: "process_stream", **kwargs, &block) ConcurrentStream.setup(stream, **kwargs) begin begin yield ensure stream.close if close && stream.respond_to?(:close) && ! (stream.respond_to?(:closed?) && stream.closed?) stream.join if join && stream.respond_to?(:join) && ! stream.joined? end rescue Aborted Log.low "Aborted #{message}: #{$!.message}" stream.abort($!) if stream.respond_to?(:abort) && ! stream.aborted? raise $! rescue Exception Log.low "Exception #{message}: #{$!.message}" stream.abort($!) if stream.respond_to?(:abort) && ! stream.aborted? raise $! end end |
.setup(stream, options = {}, &block) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/scout/concurrent_stream.rb', line 14 def self.setup(stream, = {}, &block) threads, pids, callback, abort_callback, filename, autojoin, lock, no_fail, pair, next_stream = IndiferentHash. , :threads, :pids, :callback, :abort_callback, :filename, :autojoin, :lock, :no_fail, :pair, :next stream.extend ConcurrentStream unless ConcurrentStream === stream stream.threads ||= [] stream.pids ||= [] stream.threads.concat(Array === threads ? threads : [threads]) unless threads.nil? stream.pids.concat(Array === pids ? pids : [pids]) unless pids.nil? or pids.empty? stream.autojoin = autojoin unless autojoin.nil? stream.no_fail = no_fail unless no_fail.nil? stream.std_err = "" stream.next = next_stream unless next_stream.nil? stream.pair = pair unless pair.nil? callback = block if block_given? if callback if stream.callback old_callback = stream.callback stream.callback = Proc.new do old_callback.call callback.call end else stream.callback = callback end end if abort_callback if stream.abort_callback old_abort_callback = stream.abort_callback stream.abort_callback = Proc.new do old_abort_callback.call abort_callback.call end else stream.abort_callback = abort_callback end end stream.filename = filename if filename stream.lock = lock unless lock.nil? stream.aborted = false stream end |
Instance Method Details
#abort(exception = nil) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/scout/concurrent_stream.rb', line 186 def abort(exception = nil) self.stream_exception ||= exception if @aborted Log.medium "Already aborted stream #{Log.fingerprint self} [#{@aborted}]" return else Log.medium "Aborting stream #{Log.fingerprint self} [#{@aborted}]" end AbortedStream.setup(self, exception) @aborted = true begin @abort_callback.call exception if @abort_callback abort_threads(exception) abort_pids @callback = nil @abort_callback = nil if @pair && @pair.respond_to?(:abort) && ! @pair.aborted? Log.medium "Aborting pair stream #{Log.fingerprint self}: #{Log.fingerprint @pair }" @pair.abort exception end ensure close unless closed? if lock and lock.locked? lock.unlock end end end |
#abort_pids ⇒ Object
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/scout/concurrent_stream.rb', line 175 def abort_pids @pids.each do |pid| begin Log.low "Killing PID #{pid} in ConcurrentStream #{filename}" Process.kill :INT, pid rescue Errno::ESRCH end end if @pids @pids = [] end |
#abort_threads(exception = nil) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/scout/concurrent_stream.rb', line 156 def abort_threads(exception = nil) return unless @threads and @threads.any? name = Log.fingerprint(Thread.current) name += " - file:#{filename}" if filename Log.low "Aborting threads (#{name}) - #{@threads.collect{|t| Log.fingerprint(t) } * ", "}" threads = @threads.dup @threads.clear threads.each do |t| next if t == Thread.current next if t["aborted"] t["aborted"] = true exception = exception.nil? ? Aborted.new : exception Log.debug "Aborting thread #{Log.fingerprint(t)} with exception: #{exception}" t.raise(exception) t.join end end |
#aborted? ⇒ Boolean
80 81 82 |
# File 'lib/scout/concurrent_stream.rb', line 80 def aborted? @aborted end |
#add_callback(&block) ⇒ Object
256 257 258 259 260 261 262 |
# File 'lib/scout/concurrent_stream.rb', line 256 def add_callback(&block) old_callback = callback @callback = Proc.new do old_callback.call if old_callback block.call end end |
#annotate(stream) ⇒ Object
67 68 69 70 |
# File 'lib/scout/concurrent_stream.rb', line 67 def annotate(stream) ConcurrentStream.setup(stream, :threads => threads, :pids => pids, :callback => callback, :abort_callback => abort_callback, :filename => filename, :autojoin => autojoin, :lock => lock) stream end |
#clear ⇒ Object
72 73 74 |
# File 'lib/scout/concurrent_stream.rb', line 72 def clear @threads = @pids = @callback = @abort_callback = @joined = nil end |
#close(*args) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/scout/concurrent_stream.rb', line 218 def close(*args) if autojoin begin super(*args) rescue self.abort self.join stream_raise_exception $! ensure self.join if ! @stream_exception && (self.closed? || self.eof?) end else begin super(*args) rescue IOError end unless self.closed? end end |
#join ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/scout/concurrent_stream.rb', line 138 def join begin join_threads join_pids raise stream_exception if stream_exception join_callback close unless closed? ensure @joined = true begin lock.unlock if lock && lock.locked? rescue Log.exception $! end raise stream_exception if stream_exception end end |
#join_callback ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/scout/concurrent_stream.rb', line 128 def join_callback if @callback and not joined? begin @callback.call ensure @callback = nil end end end |
#join_pids ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/scout/concurrent_stream.rb', line 115 def join_pids if @pids and @pids.any? @pids.each do |pid| begin Process.waitpid(pid, Process::WUNTRACED) stream_raise_exception ConcurrentStreamProcessFailed.new(pid, "Error in waitpid", self) unless $?.success? or no_fail rescue Errno::ECHILD end end @pids = [] end end |
#join_threads ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/scout/concurrent_stream.rb', line 84 def join_threads if @threads @threads.each do |t| next if t == Thread.current begin t.join if Process::Status === t.value if ! (t.value.success? || no_fail) if log msg = "Error joining #{self.filename || self.inspect}. Last log line: #{log}" else msg = "Error joining #{self.filename || self.inspect}" end raise ConcurrentStreamProcessFailed.new t.pid, msg, self end end rescue Exception if no_fail Log.low "Not failing on exception joining thread in ConcurrenStream - #{filename} - #{$!.message}" else Log.low "Exception joining thread in ConcurrenStream #{Log.fingerprint self} - #{Log.fingerprint t} - #{$!.message}" stream_raise_exception $! end end end end @threads = [] end |
#joined? ⇒ Boolean
76 77 78 |
# File 'lib/scout/concurrent_stream.rb', line 76 def joined? @joined end |
#read(*args) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/scout/concurrent_stream.rb', line 237 def read(*args) begin super(*args) rescue Exception @stream_exception ||= $! raise @stream_exception ensure if ! @stream_exception && autojoin && ! closed? begin done = eof? rescue Exception self.abort($!) raise $! end close if done end end end |
#stream_raise_exception(exception) ⇒ Object
264 265 266 267 268 269 270 |
# File 'lib/scout/concurrent_stream.rb', line 264 def stream_raise_exception(exception) self.stream_exception = exception threads.each do |thread| thread.raise exception end self.abort end |