Class: Spark::Serializer::Batched
- Inherits:
-
Base
- Object
- Base
- Spark::Serializer::Batched
show all
- Defined in:
- lib/spark/serializer/batched.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods inherited from Base
#==, #check_each, #error, #inspect, #load_from_file
Constructor Details
#initialize(serializer, batch_size = nil) ⇒ Batched
Returns a new instance of Batched.
7
8
9
10
11
12
13
14
|
# File 'lib/spark/serializer/batched.rb', line 7
def initialize(serializer, batch_size=nil)
batch_size ||= Spark::Serializer::DEFAULT_BATCH_SIZE
@serializer = serializer
@batch_size = batch_size.to_i
error('Batch size must be greater than 0') if @batch_size < 1
end
|
Instance Attribute Details
#serializer=(value) ⇒ Object
Sets the attribute serializer
5
6
7
|
# File 'lib/spark/serializer/batched.rb', line 5
def serializer=(value)
@serializer = value
end
|
Instance Method Details
#batched? ⇒ Boolean
17
18
19
|
# File 'lib/spark/serializer/batched.rb', line 17
def batched?
@batch_size > 1
end
|
#dump(data) ⇒ Object
29
30
31
|
# File 'lib/spark/serializer/batched.rb', line 29
def dump(data)
@serializer.dump(data)
end
|
#dump_to_io(data, io) ⇒ Object
Dump ==============================================================
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/spark/serializer/batched.rb', line 44
def dump_to_io(data, io)
check_each(data)
if batched?
data = data.each_slice(@batch_size)
end
data.each do |item|
serialized = dump(item)
io.write_string(serialized)
end
io.flush
end
|
#load(data) ⇒ Object
25
26
27
|
# File 'lib/spark/serializer/batched.rb', line 25
def load(data)
@serializer.load(data)
end
|
#load_from_io(io) ⇒ Object
Load ==============================================================
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/spark/serializer/batched.rb', line 62
def load_from_io(io)
return to_enum(__callee__, io) unless block_given?
loop do
size = io.read_int_or_eof
break if size == Spark::Constant::DATA_EOF
data = io.read(size)
data = load(data)
if batched?
data.each{|item| yield item }
else
yield data
end
end
end
|
#name ⇒ Object
33
34
35
|
# File 'lib/spark/serializer/batched.rb', line 33
def name
"Batched(#{@batch_size})"
end
|
#to_s ⇒ Object
37
38
39
|
# File 'lib/spark/serializer/batched.rb', line 37
def to_s
"#{name} -> #{@serializer}"
end
|
#unbatch! ⇒ Object
21
22
23
|
# File 'lib/spark/serializer/batched.rb', line 21
def unbatch!
@batch_size = 1
end
|