Class: RbbtProcessQueue
- Inherits:
-
Object
- Object
- RbbtProcessQueue
- Defined in:
- lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb
Defined Under Namespace
Classes: RbbtProcessQueueWorker, RbbtProcessSocket
Instance Attribute Summary collapse
-
#callback(&block) ⇒ Object
Returns the value of attribute callback.
-
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
-
#callback_thread ⇒ Object
Returns the value of attribute callback_thread.
-
#cleanup ⇒ Object
Returns the value of attribute cleanup.
-
#join ⇒ Object
Returns the value of attribute join.
-
#num_processes ⇒ Object
Returns the value of attribute num_processes.
-
#process_monitor ⇒ Object
Returns the value of attribute process_monitor.
-
#processes ⇒ Object
Returns the value of attribute processes.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#reswpan ⇒ Object
Returns the value of attribute reswpan.
Class Method Summary collapse
Instance Method Summary collapse
- #abort ⇒ Object
- #clean ⇒ Object
- #close_callback ⇒ Object
- #init(&block) ⇒ Object
-
#initialize(num_processes, cleanup = nil, join = nil, reswpan = nil) ⇒ RbbtProcessQueue
constructor
A new instance of RbbtProcessQueue.
- #process(*e) ⇒ Object
Constructor Details
#initialize(num_processes, cleanup = nil, join = nil, reswpan = nil) ⇒ RbbtProcessQueue
Returns a new instance of RbbtProcessQueue.
7 8 9 10 11 12 13 14 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 7 def initialize(num_processes, cleanup = nil, join = nil, reswpan = nil) @num_processes = num_processes @processes = [] @cleanup = cleanup @join = join @respawn = reswpan @queue = RbbtProcessSocket.new end |
Instance Attribute Details
#callback(&block) ⇒ Object
Returns the value of attribute callback.
16 17 18 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 16 def callback @callback end |
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
16 17 18 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 16 def callback_queue @callback_queue end |
#callback_thread ⇒ Object
Returns the value of attribute callback_thread.
16 17 18 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 16 def callback_thread @callback_thread end |
#cleanup ⇒ Object
Returns the value of attribute cleanup.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def cleanup @cleanup end |
#join ⇒ Object
Returns the value of attribute join.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def join @join end |
#num_processes ⇒ Object
Returns the value of attribute num_processes.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def num_processes @num_processes end |
#process_monitor ⇒ Object
Returns the value of attribute process_monitor.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def process_monitor @process_monitor end |
#processes ⇒ Object
Returns the value of attribute processes.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def processes @processes end |
#queue ⇒ Object
Returns the value of attribute queue.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def queue @queue end |
#reswpan ⇒ Object
Returns the value of attribute reswpan.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def reswpan @reswpan end |
Class Method Details
.each(list, num = 3, &block) ⇒ Object
143 144 145 146 147 148 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 143 def self.each(list, num = 3, &block) q = RbbtProcessQueue.new num q.init(&block) list.each do |elem| q.process elem end q.join end |
Instance Method Details
#abort ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 123 def abort begin (@process_monitor.raise(Aborted.new); @process_monitor.join) if @process_monitor and @process_monitor.alive? (@callback_thread.raise(Aborted.new); @callback_thread.join) if @callback_thread and @callback_thread.alive? ensure begin join rescue ProcessFailed end end end |
#clean ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 113 def clean if (@process_monitor and @process_monitor.alive?) or (@callback_thread and @callback_thread.alive?) self.abort self.join end @queue.clean if @queue @callback_queue.clean if @callback_queue end |
#close_callback ⇒ Object
80 81 82 83 84 85 86 87 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 80 def close_callback begin @callback_queue.push ClosedStream.new if @callback_thread.alive? rescue Exception Log.warn "Error closing callback: #{$!.}" end @callback_thread.join #if @callback_thread.alive? end |
#init(&block) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 50 def init(&block) num_processes.times do |i| @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, &block) end @queue.close_read @process_monitor = Thread.new(Thread.current) do |parent| begin while @processes.any? @processes[0].join @processes.shift end rescue Aborted Log.warn "Aborting process monitor" @processes.each{|p| p.abort } @processes.each{|p| begin p.join rescue ProcessFailed end } rescue Exception Log.warn "Process monitor exception: #{$!.}" @processes.each{|p| p.abort } @callback_thread.raise $! if @callback_thread and @callback_thread.alive? raise $! end end end |
#process(*e) ⇒ Object
135 136 137 138 139 140 141 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 135 def process(*e) begin @queue.push e rescue Errno::EPIPE raise Aborted end end |