Class: Aerospike::BatchOperateCommand

Inherits:
MultiCommand show all
Defined in:
lib/aerospike/command/batch_operate_command.rb

Overview

:nodoc:

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from MultiCommand

#compressed?, #get_node, #parse_group, #parse_key, #parse_record, #parse_result, #read_bytes, #skip_key, #stop, #valid?

Methods inherited from Command

#execute, #set_delete, #set_exists, #set_operate, #set_query, #set_read, #set_read_for_key_only, #set_read_header, #set_scan, #set_touch, #set_udf, #set_write, #write_bins

Constructor Details

#initialize(node, batch, policy, records) ⇒ BatchOperateCommand

Returns a new instance of BatchOperateCommand.

[View source]

26
27
28
29
30
31
# File 'lib/aerospike/command/batch_operate_command.rb', line 26

def initialize(node, batch, policy, records)
  super(node)
  @batch = batch
  @policy = policy
  @records = records
end

Instance Attribute Details

#attrObject

Returns the value of attribute attr.


24
25
26
# File 'lib/aerospike/command/batch_operate_command.rb', line 24

def attr
  @attr
end

#batchObject

Returns the value of attribute batch.


24
25
26
# File 'lib/aerospike/command/batch_operate_command.rb', line 24

def batch
  @batch
end

#policyObject

Returns the value of attribute policy.


24
25
26
# File 'lib/aerospike/command/batch_operate_command.rb', line 24

def policy
  @policy
end

#recordsObject

Returns the value of attribute records.


24
25
26
# File 'lib/aerospike/command/batch_operate_command.rb', line 24

def records
  @records
end

Instance Method Details

#batch_flagsObject

[View source]

33
34
35
36
37
38
39
# File 'lib/aerospike/command/batch_operate_command.rb', line 33

def batch_flags
  flags = 0
  # flags |= 0x1 if @policy.allow_inline
  flags |= 0x2 if @policy.allow_inline_ssd
  flags |= 0x4 if @policy.respond_all_keys
  flags
end

#parse_row(result_code) ⇒ Object

Parse all results in the batch. Add records to shared list. If the record was not found, the bins will be nil.

[View source]

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/aerospike/command/batch_operate_command.rb', line 131

def parse_row(result_code)
  generation = @data_buffer.read_int32(6)
  expiration = @data_buffer.read_int32(10)
  batch_index = @data_buffer.read_int32(14)
  field_count = @data_buffer.read_int16(18)
  op_count = @data_buffer.read_int16(20)

  skip_key(field_count)
  req_key = records[batch_index].key

  records[batch_index].result_code = result_code
  case result_code
  when 0, ResultCode::UDF_BAD_RESPONSE
    record = parse_record(req_key, op_count, generation, expiration)
    records[batch_index].record = record
  end
end

#write_bufferObject

[View source]

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/aerospike/command/batch_operate_command.rb', line 41

def write_buffer
  field_count = 1

  exp_size = estimate_expression_size(@policy.filter_exp)
  @data_offset += exp_size
  field_count += 1 if exp_size > 0

  @data_buffer.reset
  begin_cmd
  @data_offset += FIELD_HEADER_SIZE + 4 + 1 # batch.keys.length + flags

  prev = nil
  @records.each do |record|
    key = record.key
    @data_offset += key.digest.length + 4 # 4 byte batch offset

    if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev
      @data_offset += 1
    else
      @data_offset += 12
      @data_offset += key.namespace.bytesize + FIELD_HEADER_SIZE
      @data_offset += key.set_name.bytesize + FIELD_HEADER_SIZE
      @data_offset += record.size
    end

    prev = record
  end
  size_buffer
  write_batch_header(policy, field_count)

  write_filter_exp(@policy.filter_exp, exp_size)

  field_size_offset = @data_offset

  write_field_header(0, Aerospike::FieldType::BATCH_INDEX)
  @data_offset += @data_buffer.write_int32(batch.records.length, @data_offset)
  @data_offset += @data_buffer.write_byte(batch_flags, @data_offset)

  prev = nil
  attr = BatchAttr.new
  batch.records.each_with_index do |record, index|
    @data_offset += @data_buffer.write_int32(index, @data_offset)
    key = record.key
    @data_offset += @data_buffer.write_binary(key.digest, @data_offset)

    if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev
      @data_offset += @data_buffer.write_byte(BATCH_MSG_REPEAT, @data_offset)
    else
      case record
      when BatchRead
        attr.set_batch_read(record.policy)
        if record.bin_names&.length&.> 0
          write_batch_bin_names(key, record.bin_names, attr, attr.filter_exp)
        elsif record.ops&.length&.> 0
          attr.adjust_read(record.ops)
          write_batch_operations(key, record.ops, attr, attr.filter_exp)
        else
          attr.adjust_read_all_bins(record.read_all_bins)
          write_batch_read(key, attr, attr.filter_exp, 0)
        end

      when BatchWrite
        attr.set_batch_write(record.policy)
        attr.adjust_write(record.ops)
        write_batch_operations(key, record.ops, attr, attr.filter_exp)

      when BatchUDF
        attr.set_batch_udf(record.policy)
        write_batch_write(key, attr, attr.filter_exp, 3, 0)
        write_field_string(record.package_name, Aerospike::FieldType::UDF_PACKAGE_NAME)
        write_field_string(record.function_name, Aerospike::FieldType::UDF_FUNCTION)
        write_field_bytes(record.arg_bytes, Aerospike::FieldType::UDF_ARGLIST)

      when BatchDelete
        attr.set_batch_delete(record.policy)
        write_batch_write(key, attr, attr.filter_exp, 0, 0)
      end

      prev = record
    end
  end

  @data_buffer.write_uint32(@data_offset-MSG_TOTAL_HEADER_SIZE-4, field_size_offset)

  end_cmd
  mark_compressed(@policy)
end