Method: Aerospike::BatchOperateCommand#write_buffer

Defined in:
lib/aerospike/command/batch_operate_command.rb

#write_bufferObject

[View source]

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/aerospike/command/batch_operate_command.rb', line 41

def write_buffer
  field_count = 1

  exp_size = estimate_expression_size(@policy.filter_exp)
  @data_offset += exp_size
  field_count += 1 if exp_size > 0

  @data_buffer.reset
  begin_cmd
  @data_offset += FIELD_HEADER_SIZE + 4 + 1 # batch.keys.length + flags

  prev = nil
  @records.each do |record|
    key = record.key
    @data_offset += key.digest.length + 4 # 4 byte batch offset

    if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev
      @data_offset += 1
    else
      @data_offset += 12
      @data_offset += key.namespace.bytesize + FIELD_HEADER_SIZE
      @data_offset += key.set_name.bytesize + FIELD_HEADER_SIZE
      @data_offset += record.size
    end

    prev = record
  end
  size_buffer
  write_batch_header(policy, field_count)

  write_filter_exp(@policy.filter_exp, exp_size)

  field_size_offset = @data_offset

  write_field_header(0, Aerospike::FieldType::BATCH_INDEX)
  @data_offset += @data_buffer.write_int32(batch.records.length, @data_offset)
  @data_offset += @data_buffer.write_byte(batch_flags, @data_offset)

  prev = nil
  attr = BatchAttr.new
  batch.records.each_with_index do |record, index|
    @data_offset += @data_buffer.write_int32(index, @data_offset)
    key = record.key
    @data_offset += @data_buffer.write_binary(key.digest, @data_offset)

    if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev
      @data_offset += @data_buffer.write_byte(BATCH_MSG_REPEAT, @data_offset)
    else
      case record
      when BatchRead
        attr.set_batch_read(record.policy)
        if record.bin_names&.length&.> 0
          write_batch_bin_names(key, record.bin_names, attr, attr.filter_exp)
        elsif record.ops&.length&.> 0
          attr.adjust_read(record.ops)
          write_batch_operations(key, record.ops, attr, attr.filter_exp)
        else
          attr.adjust_read_all_bins(record.read_all_bins)
          write_batch_read(key, attr, attr.filter_exp, 0)
        end

      when BatchWrite
        attr.set_batch_write(record.policy)
        attr.adjust_write(record.ops)
        write_batch_operations(key, record.ops, attr, attr.filter_exp)

      when BatchUDF
        attr.set_batch_udf(record.policy)
        write_batch_write(key, attr, attr.filter_exp, 3, 0)
        write_field_string(record.package_name, Aerospike::FieldType::UDF_PACKAGE_NAME)
        write_field_string(record.function_name, Aerospike::FieldType::UDF_FUNCTION)
        write_field_bytes(record.arg_bytes, Aerospike::FieldType::UDF_ARGLIST)

      when BatchDelete
        attr.set_batch_delete(record.policy)
        write_batch_write(key, attr, attr.filter_exp, 0, 0)
      end

      prev = record
    end
  end

  @data_buffer.write_uint32(@data_offset-MSG_TOTAL_HEADER_SIZE-4, field_size_offset)

  end_cmd
  mark_compressed(@policy)
end