Class: Tsuga::Service::Clusterer
- Inherits:
-
Object
- Object
- Tsuga::Service::Clusterer
- Defined in:
- lib/tsuga/service/clusterer.rb
Defined Under Namespace
Modules: SteppedProgressBar Classes: MutableSet, WriteQueue
Constant Summary collapse
Instance Attribute Summary collapse
-
#_adapter ⇒ Object
readonly
Returns the value of attribute _adapter.
-
#_queue ⇒ Object
readonly
Returns the value of attribute _queue.
-
#_source ⇒ Object
readonly
Returns the value of attribute _source.
Instance Method Summary collapse
-
#initialize(source: nil, adapter: nil) ⇒ Clusterer
constructor
A new instance of Clusterer.
- #run ⇒ Object
Constructor Details
#initialize(source: nil, adapter: nil) ⇒ Clusterer
Returns a new instance of Clusterer.
15 16 17 18 19 |
# File 'lib/tsuga/service/clusterer.rb', line 15 def initialize(source: nil, adapter: nil) @_source = source @_adapter = adapter @_queue = WriteQueue.new(adapter: adapter) end |
Instance Attribute Details
#_adapter ⇒ Object (readonly)
Returns the value of attribute _adapter.
13 14 15 |
# File 'lib/tsuga/service/clusterer.rb', line 13 def _adapter @_adapter end |
#_queue ⇒ Object (readonly)
Returns the value of attribute _queue.
13 14 15 |
# File 'lib/tsuga/service/clusterer.rb', line 13 def _queue @_queue end |
#_source ⇒ Object (readonly)
Returns the value of attribute _source.
13 14 15 |
# File 'lib/tsuga/service/clusterer.rb', line 13 def _source @_source end |
Instance Method Details
#run ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/tsuga/service/clusterer.rb', line 21 def run # delete all clusters _adapter.delete_all # create lowest-level clusters _source.find_each do |record| _queue.push _adapter.build_from(Tsuga::MAX_DEPTH, record) end _queue.flush # for all depths N from 18 to 3 (Tsuga::MAX_DEPTH-1).downto(Tsuga::MIN_DEPTH) do |depth| progress.log "depth #{depth}" if VERBOSE progress.title = "#{depth}.0" if VERBOSE # create clusters at this level from children # TODO: use a save queue, only run saves if > 100 clusters to write cluster_ids = Set.new _adapter.at_depth(depth+1).find_each do |child| _queue.push _adapter.build_from(depth, child) end _queue.flush cluster_ids = MutableSet.new(_adapter.at_depth(depth).collect_ids) if cluster_ids.empty? progress.log "nothing to cluster" if VERBOSE break end # TODO: group points to cluster by tile, and run on tiles in parallel. progress.title = "#{depth}.1" if VERBOSE progress.log "started with #{cluster_ids.length} clusters" if VERBOSE progress.set_phase(depth, 1, cluster_ids.length) if VERBOSE while cluster_ids.any? progress.set_progress(cluster_ids.length) if VERBOSE cluster = _adapter.find_by_id(cluster_ids.first) raise 'internal error: cluster was already removed' if cluster.nil? tile = Tile.including(cluster, depth: depth) clusters = _adapter.in_tile(*tile.neighbours).to_a processed_cluster_ids = clusters.collect(&:id) # clusters we aggregate in this loop iteration # they are _not_ the same as what we pass to the aggregator, # just those inside the fence fenced_cluster_ids = _adapter.in_tile(tile).collect_ids raise RuntimeError, 'no cluster in fence' if fenced_cluster_ids.empty? Aggregator.new(clusters:clusters, ratio:PROXIMITY_RATIO, fence:tile).tap do |aggregator| aggregator.run if VERBOSE progress.log("aggregator: %4d left, %2d processed, %2d in fence, %2d updated, %2d dropped" % [ cluster_ids.length, processed_cluster_ids.length, fenced_cluster_ids.length, aggregator.updated_clusters.length, aggregator.dropped_clusters.length]) if aggregator.updated_clusters.any? progress.log("updated: #{aggregator.updated_clusters.collect(&:id).join(', ')}") end if aggregator.dropped_clusters.any? progress.log("dropped: #{aggregator.dropped_clusters.collect(&:id).join(', ')}") end end cluster_ids.remove! fenced_cluster_ids # updated clusters may need to be reprocessed (they might have fallen close enough to tile edges) # TODO: as further optimisation, do not mark for reprocessing clusters that are still inside the fence cluster_ids.merge! aggregator.updated_clusters.collect(&:id) # destroyed clusters may include some on the outer fringe of the fence tile cluster_ids.remove! aggregator.dropped_clusters.collect(&:id) aggregator.dropped_clusters.each(&:destroy) _adapter.mass_update(aggregator.updated_clusters) end if RUN_SANITY_CHECK # sanity check: all <cluster_ids> should exist not_removed = cluster_ids - _adapter.at_depth(depth).collect_ids if not_removed.any? raise "cluster_ids contains IDs of deleted clusters: #{not_removed.to_a.join(', ')}" end # sanity check: sum of weights should match that of lower level deeper_weight = _adapter.at_depth(depth+1).sum(:weight) this_weight = _adapter.at_depth(depth).sum(:weight) if deeper_weight != this_weight raise "mismatch between weight at this depth (#{this_weight}) and deeper level (#{deeper_weight})" end end end # set parent_id in the whole tree # this is made slightly more complicated by #find_each's scoping progress.title = "#{depth}.2" if VERBOSE child_mappings = {} _adapter.at_depth(depth).find_each do |cluster| cluster.children_ids.each do |child_id| child_mappings[child_id] = cluster.id end end child_mappings.each_pair do |child_id, parent_id| cluster = _adapter.find_by_id(child_id) cluster.parent_id = parent_id _queue.push cluster end _queue.flush end progress.finish if VERBOSE end |