Class: RbbtProcessQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb

Defined Under Namespace

Classes: RbbtProcessQueueWorker, RbbtProcessSocket

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_processes, cleanup = nil, join = nil, reswpan = nil) ⇒ RbbtProcessQueue

Returns a new instance of RbbtProcessQueue.



7
8
9
10
11
12
13
14
# File 'lib/rbbt/util/concurrency/processes.rb', line 7

def initialize(num_processes, cleanup = nil, join = nil, reswpan = nil)
  @num_processes = num_processes
  @processes = []
  @cleanup = cleanup
  @join = join
  @respawn = reswpan
  @queue = RbbtProcessSocket.new
end

Instance Attribute Details

#callback(&block) ⇒ Object

Returns the value of attribute callback.



16
17
18
# File 'lib/rbbt/util/concurrency/processes.rb', line 16

def callback
  @callback
end

#callback_queueObject

Returns the value of attribute callback_queue.



16
17
18
# File 'lib/rbbt/util/concurrency/processes.rb', line 16

def callback_queue
  @callback_queue
end

#callback_threadObject

Returns the value of attribute callback_thread.



16
17
18
# File 'lib/rbbt/util/concurrency/processes.rb', line 16

def callback_thread
  @callback_thread
end

#cleanupObject

Returns the value of attribute cleanup.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def cleanup
  @cleanup
end

#joinObject

Returns the value of attribute join.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def join
  @join
end

#num_processesObject

Returns the value of attribute num_processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def num_processes
  @num_processes
end

#process_monitorObject

Returns the value of attribute process_monitor.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def process_monitor
  @process_monitor
end

#processesObject

Returns the value of attribute processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def processes
  @processes
end

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def queue
  @queue
end

#reswpanObject

Returns the value of attribute reswpan.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def reswpan
  @reswpan
end

Class Method Details

.each(list, num = 3, &block) ⇒ Object



143
144
145
146
147
148
# File 'lib/rbbt/util/concurrency/processes.rb', line 143

def self.each(list, num = 3, &block)
  q = RbbtProcessQueue.new num
  q.init(&block)
  list.each do |elem| q.process elem end
  q.join
end

Instance Method Details

#abortObject



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/rbbt/util/concurrency/processes.rb', line 123

def abort
  begin
    (@process_monitor.raise(Aborted.new); @process_monitor.join) if @process_monitor and @process_monitor.alive?
    (@callback_thread.raise(Aborted.new); @callback_thread.join) if @callback_thread and @callback_thread.alive?
  ensure
    begin
      join
    rescue ProcessFailed
    end
  end
end

#cleanObject



113
114
115
116
117
118
119
120
121
# File 'lib/rbbt/util/concurrency/processes.rb', line 113

def clean
  if (@process_monitor and @process_monitor.alive?) or (@callback_thread and @callback_thread.alive?)
    self.abort 
    self.join
  end

  @queue.clean if @queue
  @callback_queue.clean if @callback_queue
end

#close_callbackObject



80
81
82
83
84
85
86
87
# File 'lib/rbbt/util/concurrency/processes.rb', line 80

def close_callback
  begin
    @callback_queue.push ClosedStream.new if @callback_thread.alive?
  rescue Exception
    Log.warn "Error closing callback: #{$!.message}"
  end
  @callback_thread.join  #if @callback_thread.alive?
end

#init(&block) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rbbt/util/concurrency/processes.rb', line 50

def init(&block)
  num_processes.times do |i|
    @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, &block)
  end
  @queue.close_read

  @process_monitor = Thread.new(Thread.current) do |parent|
    begin
      while @processes.any?
        @processes[0].join 
        @processes.shift
      end
    rescue Aborted
      Log.warn "Aborting process monitor"
      @processes.each{|p| p.abort }
      @processes.each{|p| 
        begin
          p.join 
        rescue ProcessFailed
        end
      }
    rescue Exception
      Log.warn "Process monitor exception: #{$!.message}"
      @processes.each{|p| p.abort }
      @callback_thread.raise $! if @callback_thread and @callback_thread.alive?
      raise $!
    end
  end
end

#process(*e) ⇒ Object



135
136
137
138
139
140
141
# File 'lib/rbbt/util/concurrency/processes.rb', line 135

def process(*e)
  begin
    @queue.push e
  rescue Errno::EPIPE
    raise Aborted
  end
end