Class: Spark::Serializer::Batched

Inherits:
Base
  • Object
show all
Defined in:
lib/spark/serializer/batched.rb

Direct Known Subclasses

AutoBatched

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#==, #check_each, #error, #inspect, #load_from_file

Constructor Details

#initialize(serializer, batch_size = nil) ⇒ Batched

Returns a new instance of Batched.



7
8
9
10
11
12
13
14
# File 'lib/spark/serializer/batched.rb', line 7

def initialize(serializer, batch_size=nil)
  batch_size ||= Spark::Serializer::DEFAULT_BATCH_SIZE

  @serializer = serializer
  @batch_size = batch_size.to_i

  error('Batch size must be greater than 0') if @batch_size < 1
end

Instance Attribute Details

#serializer=(value) ⇒ Object (writeonly)

Sets the attribute serializer

Parameters:

  • value

    the value to set the attribute serializer to.



5
6
7
# File 'lib/spark/serializer/batched.rb', line 5

def serializer=(value)
  @serializer = value
end

Instance Method Details

#batched?Boolean

Really batched

Returns:

  • (Boolean)


17
18
19
# File 'lib/spark/serializer/batched.rb', line 17

def batched?
  @batch_size > 1
end

#dump(data) ⇒ Object



29
30
31
# File 'lib/spark/serializer/batched.rb', line 29

def dump(data)
  @serializer.dump(data)
end

#dump_to_io(data, io) ⇒ Object

Dump ==============================================================



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/spark/serializer/batched.rb', line 44

def dump_to_io(data, io)
  check_each(data)

  if batched?
    data = data.each_slice(@batch_size)
  end

  data.each do |item|
    serialized = dump(item)
    io.write_string(serialized)
  end

  io.flush
end

#load(data) ⇒ Object



25
26
27
# File 'lib/spark/serializer/batched.rb', line 25

def load(data)
  @serializer.load(data)
end

#load_from_io(io) ⇒ Object

Load ==============================================================



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/spark/serializer/batched.rb', line 62

def load_from_io(io)
  return to_enum(__callee__, io) unless block_given?

  loop do
    size = io.read_int_or_eof
    break if size == Spark::Constant::DATA_EOF

    data = io.read(size)
    data = load(data)

    if batched?
      data.each{|item| yield item }
    else
      yield data
    end
  end
end

#nameObject



33
34
35
# File 'lib/spark/serializer/batched.rb', line 33

def name
  "Batched(#{@batch_size})"
end

#to_sObject



37
38
39
# File 'lib/spark/serializer/batched.rb', line 37

def to_s
  "#{name} -> #{@serializer}"
end

#unbatch!Object



21
22
23
# File 'lib/spark/serializer/batched.rb', line 21

def unbatch!
  @batch_size = 1
end