Class: Kafka::FFI::Message::Header

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/message/header.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::OpaquePointer

Class Method Details

.new(count = 0) ⇒ Object



8
9
10
11
12
13
14
# File 'lib/kafka/ffi/message/header.rb', line 8

def self.new(count = 0)
  if count.is_a?(::FFI::Pointer)
    return super(count)
  end

  ::Kafka::FFI.rd_kafka_headers_new(count)
end

Instance Method Details

#add(name, value) ⇒ Object

Add header with name and value.

Parameters:

  • name (String)

    Header key name

  • value (#to_s, nil)

    Header key value

Raises:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/kafka/ffi/message/header.rb', line 31

def add(name, value)
  name = name.to_s

  value_size = 0
  if value
    value = value.to_s
    value_size = value.bytesize
  end

  err = ::Kafka::FFI.rd_kafka_header_add(self, name, name.length, value, value_size)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#copyHeader

Make a copy of the headers list

Returns:

  • (Header)

    Copy of the headers



51
52
53
# File 'lib/kafka/ffi/message/header.rb', line 51

def copy
  ::Kafka::FFI.rd_kafka_headers_copy(self)
end

#countInteger Also known as: size, length

Count returns the number of headers in the set.

Returns:

  • (Integer)

    Number of headers



19
20
21
# File 'lib/kafka/ffi/message/header.rb', line 19

def count
  ::Kafka::FFI.rd_kafka_header_cnt(self)
end

#destroyObject



199
200
201
202
203
# File 'lib/kafka/ffi/message/header.rb', line 199

def destroy
  if !pointer.null?
    ::Kafka::FFI.rd_kafka_headers_destroy(self)
  end
end

#get(name) ⇒ Array<String, nil>

Retrieve all headers that match the given name

Parameters:

  • name (String)

    Header key name

Returns:

  • (Array<String, nil>)

    List of values for the header

Raises:



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/kafka/ffi/message/header.rb', line 86

def get(name)
  name = name.to_s

  idx = 0
  values = []

  value = ::FFI::MemoryPointer.new(:pointer)
  size  = ::FFI::MemoryPointer.new(:pointer)

  loop do
    err = ::Kafka::FFI.rd_kafka_header_get(self, idx, name, value, size)

    case err
    when :ok
      # Read the returned value and add it to the result list.
      idx += 1
      ptr = value.read_pointer

      values << (ptr.null? ? nil : ptr.read_string(size.read(:size_t)))
    when ::Kafka::FFI::RD_KAFKA_RESP_ERR__NOENT
      # Reached the end of the list of values so break and return the set
      # of found values.
      break
    else
      raise ::Kafka::ResponseError, err
    end
  end

  values
ensure
  value.free if value
  size.free if size
end

#get_allHash<String, Array<String>>, Hash{} Also known as: to_hash, to_h

Retrieve all of the headers and their values

Returns:

  • (Hash<String, Array<String>>)

    Set of header keys and their values

  • (Hash{})

    Header is empty

Raises:



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/kafka/ffi/message/header.rb', line 128

def get_all
  name  = ::FFI::MemoryPointer.new(:pointer)
  value = ::FFI::MemoryPointer.new(:pointer)
  size  = ::FFI::MemoryPointer.new(:pointer)

  idx = 0
  result = {}

  loop do
    err = ::Kafka::FFI.rd_kafka_header_get_all(self, idx, name, value, size)

    case err
    when :ok
      # Read the returned value and add it to the result list.
      idx += 1

      key = name.read_pointer.read_string
      val = value.read_pointer

      result[key] ||= []
      result[key] << (val.null? ? nil : val.read_string(size.read(:size_t)))
    when ::Kafka::FFI::RD_KAFKA_RESP_ERR__NOENT
      # Reached the end of the list of values so break and return the set
      # of found values.
      break
    else
      raise ::Kafka::ResponseError, err
    end
  end

  result
ensure
  name.free if name
  value.free if value
  size.free if size
end

#get_last(name) ⇒ String?

Find the last header in the list that matches the given name.

Parameters:

  • name (String)

    Header key name

Returns:

  • (String)

    Value of the last matching header with name

  • (nil)

    No header with that name exists

Raises:



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/kafka/ffi/message/header.rb', line 177

def get_last(name)
  name  = name.to_s
  value = ::FFI::MemoryPointer.new(:pointer)
  size  = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_header_get_last(self, name, value, size)
  if err != :ok
    # No header with that name exists so just return nil
    if err == ::Kafka::FFI::RD_KAFKA_RESP_ERR__NOENT
      return nil
    end

    raise ::Kafka::ResponseError, err
  end

  ptr = value.read_pointer
  ptr.null? ? nil : ptr.read_string(size.read(:size_t))
ensure
  value.free
  size.free
end

#remove(name) ⇒ Object

Remove all headers with the given name.

Parameters:

  • name (String)

    Header key name to remove

Raises:



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/kafka/ffi/message/header.rb', line 62

def remove(name)
  name = name.to_s

  err = ::Kafka::FFI.rd_kafka_header_remove(self, name)
  case err
  when :ok
    nil
  when ::Kafka::FFI::RD_KAFKA_RESP_ERR__NOENT
    # Header field does not exist. Just return nil since the effect (key
    # doesn't exist) is the same.
    nil
  else
    raise ::Kafka::ResponseError, err
  end
end