41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/aerospike/command/batch_operate_command.rb', line 41
def write_buffer
field_count = 1
exp_size = estimate_expression_size(@policy.filter_exp)
@data_offset += exp_size
field_count += 1 if exp_size > 0
@data_buffer.reset
begin_cmd
@data_offset += FIELD_HEADER_SIZE + 4 + 1
prev = nil
@records.each do |record|
key = record.key
@data_offset += key.digest.length + 4
if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev
@data_offset += 1
else
@data_offset += 12
@data_offset += key.namespace.bytesize + FIELD_HEADER_SIZE
@data_offset += key.set_name.bytesize + FIELD_HEADER_SIZE
@data_offset += record.size
end
prev = record
end
size_buffer
(policy, field_count)
write_filter_exp(@policy.filter_exp, exp_size)
field_size_offset = @data_offset
(0, Aerospike::FieldType::BATCH_INDEX)
@data_offset += @data_buffer.write_int32(batch.records.length, @data_offset)
@data_offset += @data_buffer.write_byte(batch_flags, @data_offset)
prev = nil
attr = BatchAttr.new
batch.records.each_with_index do |record, index|
@data_offset += @data_buffer.write_int32(index, @data_offset)
key = record.key
@data_offset += @data_buffer.write_binary(key.digest, @data_offset)
if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev
@data_offset += @data_buffer.write_byte(BATCH_MSG_REPEAT, @data_offset)
else
case record
when BatchRead
attr.set_batch_read(record.policy)
if record.bin_names&.length&.> 0
write_batch_bin_names(key, record.bin_names, attr, attr.filter_exp)
elsif record.ops&.length&.> 0
attr.adjust_read(record.ops)
write_batch_operations(key, record.ops, attr, attr.filter_exp)
else
attr.adjust_read_all_bins(record.read_all_bins)
write_batch_read(key, attr, attr.filter_exp, 0)
end
when BatchWrite
attr.set_batch_write(record.policy)
attr.adjust_write(record.ops)
write_batch_operations(key, record.ops, attr, attr.filter_exp)
when BatchUDF
attr.set_batch_udf(record.policy)
write_batch_write(key, attr, attr.filter_exp, 3, 0)
write_field_string(record.package_name, Aerospike::FieldType::UDF_PACKAGE_NAME)
write_field_string(record.function_name, Aerospike::FieldType::UDF_FUNCTION)
write_field_bytes(record.arg_bytes, Aerospike::FieldType::UDF_ARGLIST)
when BatchDelete
attr.set_batch_delete(record.policy)
write_batch_write(key, attr, attr.filter_exp, 0, 0)
end
prev = record
end
end
@data_buffer.write_uint32(@data_offset-MSG_TOTAL_HEADER_SIZE-4, field_size_offset)
end_cmd
mark_compressed(@policy)
end
|