Module: MapReduce
- Included in:
- Scrappy::Agent
- Defined in:
- lib/scrappy/agent/map_reduce.rb
Defined Under Namespace
Classes: Queue
Instance Method Summary collapse
Instance Method Details
#cluster ⇒ Object
50 51 52 |
# File 'lib/scrappy/agent/map_reduce.rb', line 50 def cluster @cluster ||= (1..@cluster_count || 1).map { self.class.new(*(@cluster_options || [])) } end |
#process(list) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/scrappy/agent/map_reduce.rb', line 54 def process list results = [] results.extend MonitorMixin queue = Queue.new list.each { |element| queue << element } cluster.map { |obj| Thread.new { obj.work queue, results } }.each { |t| t.join } reduce results end |
#work(queue, results) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/scrappy/agent/map_reduce.rb', line 67 def work queue, results begin queue.pop do |item| result = map item, queue results.synchronize { results << result } end end until queue.empty? end |