Class: Spark::ExternalSorter

Inherits:
Object
  • Object
show all
Includes:
Helper::System
Defined in:
lib/spark/sort.rb

Constant Summary collapse

MEMORY_RESERVE =

Items from GC cannot be destroyed so #make_parts need some reserve

50
MEMORY_FREE_CHUNK =

How big will be chunk for adding new memory because GC not cleaning immediately un-referenced variables

10
START_SLICE_SIZE =

How many items will be evaluate from iterator at start

10
MAX_SLICE_SIZE =

Maximum of slicing. Memory control can be avoided by large value.

10_000
EVAL_N_VALUES =

How many values will be taken from each enumerator.

10
KEY_FUNCTION =

Default key function

lambda{|item| item}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper::System

included

Constructor Details

#initialize(total_memory, serializer) ⇒ ExternalSorter

Returns a new instance of ExternalSorter.



61
62
63
64
65
66
# File 'lib/spark/sort.rb', line 61

def initialize(total_memory, serializer)
  @total_memory = total_memory
  @memory_limit = total_memory * (100-MEMORY_RESERVE)    / 100
  @memory_chunk = total_memory * (100-MEMORY_FREE_CHUNK) / 100
  @serializer   = serializer
end

Instance Attribute Details

#memory_chunkObject (readonly)

Returns the value of attribute memory_chunk.



59
60
61
# File 'lib/spark/sort.rb', line 59

def memory_chunk
  @memory_chunk
end

#memory_limitObject (readonly)

Returns the value of attribute memory_limit.



59
60
61
# File 'lib/spark/sort.rb', line 59

def memory_limit
  @memory_limit
end

#serializerObject (readonly)

Returns the value of attribute serializer.



59
60
61
# File 'lib/spark/sort.rb', line 59

def serializer
  @serializer
end

#total_memoryObject (readonly)

Returns the value of attribute total_memory.



59
60
61
# File 'lib/spark/sort.rb', line 59

def total_memory
  @total_memory
end

Instance Method Details

#add_memory!Object



68
69
70
# File 'lib/spark/sort.rb', line 68

def add_memory!
  @memory_limit += memory_chunk
end

#sort_by(iterator, ascending = true, key_function = KEY_FUNCTION) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/spark/sort.rb', line 72

def sort_by(iterator, ascending=true, key_function=KEY_FUNCTION)
  return to_enum(__callee__, iterator, key_function) unless block_given?

  create_temp_folder
  internal_sorter = Spark::InternalSorter.get(ascending, key_function)

  # Make N sorted enumerators
  parts = make_parts(iterator, internal_sorter)

  return [] if parts.empty?

  # Need new key function because items have new structure
  # From: [1,2,3] to [[1, Enumerator],[2, Enumerator],[3, Enumerator]]
  key_function_with_enum = lambda{|(key, _)| key_function[key]}
  internal_sorter = Spark::InternalSorter.get(ascending, key_function_with_enum)

  heap  = []
  enums = []

  # Load first items to heap
  parts.each do |part|
    EVAL_N_VALUES.times {
      begin
        heap << [part.next, part]
      rescue StopIteration
        break
      end
    }
  end

  # Parts can be empty but heap not
  while parts.any? || heap.any?
    internal_sorter.sort(heap)

    # Since parts are sorted and heap contains EVAL_N_VALUES method
    # can add EVAL_N_VALUES items to the result
    EVAL_N_VALUES.times {
      break if heap.empty?

      item, enum = heap.shift
      enums << enum

      yield item
    }

    # Add new element to heap from part of which was result item
    while (enum = enums.shift)
      begin
        heap << [enum.next, enum]
      rescue StopIteration
        parts.delete(enum)
        enums.delete(enum)
      end
    end
  end

ensure
  destroy_temp_folder
end