Class: Kafka::FFI::Message

Inherits:
FFI::Struct
  • Object
show all
Defined in:
lib/kafka/ffi/message.rb

Defined Under Namespace

Classes: Header

Instance Method Summary collapse

Instance Method Details

#destroyObject

Frees resources used by the messages and hands ownership by to librdkafka. The application should call destroy when done processing the message.



199
200
201
202
203
# File 'lib/kafka/ffi/message.rb', line 199

def destroy
  if !null?
    ::Kafka::FFI.rd_kafka_message_destroy(self)
  end
end

#detach_headersnil, Message::Header

Get the Message’s headers and detach them from the Message (setting its headers to nil). Calling detach_headers means the applicaiton is now the owner of the returned Message::Header and must eventually call #destroy when the application is done with them.

Returns:

  • (nil)

    Message does not have any headers

  • (Message::Header)

    Set of headers

Raises:



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/kafka/ffi/message.rb', line 140

def detach_headers
  ptr = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_message_detach_headers(self, ptr)
  case err
  when :ok
    if ptr.null?
      nil
    else
      Message::Header.new(ptr)
    end
  when RD_KAFKA_RESP_ERR__NOENT
    # Messages does not have headers
    nil
  else
    raise ::Kafka::ResponseError, err
  end
ensure
  ptr.free
end

#errornil, Kafka::ResponseError

Retrieve the error associated with this message. For consumers this is used to report per-topic+partition consumer errors. For producers this is set when received in the dr_msg_cb callback to signify a fatal error publishing the message.

Returns:

  • (nil)

    Message does not have an error

  • (Kafka::ResponseError)

    RD_KAFKA_RESP_ERR__* error code



26
27
28
29
30
# File 'lib/kafka/ffi/message.rb', line 26

def error
  if self[:err] != :ok
    ::Kafka::ResponseError.new(self[:err])
  end
end

#headersnil, Message::Header

Get the message header list

Returns:

  • (nil)

    Message does not have any headers

  • (Message::Header)

    Set of headers

Raises:



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/kafka/ffi/message.rb', line 110

def headers
  ptr = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_message_headers(self, ptr)
  case err
  when :ok
    if ptr.null?
      nil
    else
      Message::Header.new(ptr)
    end
  when RD_KAFKA_RESP_ERR__NOENT
    # Messages does not have headers
    nil
  else
    raise ::Kafka::ResponseError, err
  end
ensure
  ptr.free
end

#keynil, String

Returns the optional message key used to publish the message. This key is used for partition assignment based on the ‘partitioner` or `partitioner_cb` config options.

Returns:

  • (nil)

    No partitioning key was provided

  • (String)

    The partitioning key



50
51
52
53
54
55
56
# File 'lib/kafka/ffi/message.rb', line 50

def key
  if self[:key].null?
    return nil
  end

  self[:key].read_string(self[:key_len])
end

#latencyInteger?

Retrieve the latency since the Message was published to Kafka.

Returns:

  • (Integer)

    Latency since produce() call in microseconds

  • (nil)

    Latency not available



191
192
193
194
# File 'lib/kafka/ffi/message.rb', line 191

def latency
  latency = ::Kafka::FFI.rd_kafka_message_latency(self)
  latency == -1 ? nil : latency
end

#offsetInteger, RD_KAFKA_OFFSET_INVALID

Returns the message’s offset as published in the topic’s partition. When error != nil, offset the error occurred at.

Returns:



71
72
73
# File 'lib/kafka/ffi/message.rb', line 71

def offset
  self[:offset]
end

#opaquenil, FFI::Pointer

Note:

Using the opaque is dangerous and requires that the application maintain a reference to the object passed to produce. Failing to do so will cause segfaults due to the object having been garbage collected.

Returns the per message opaque pointer that was given to produce. This is a pointer to a Ruby object owned by the application.

Examples:

Retrieve object from opaque

require "fiddle"
obj = Fiddle::Pointer.new(msg.opaque.to_i).to_value

Returns:

  • (nil)

    Opaque was not set

  • (FFI::Pointer)

    Pointer to opaque address



88
89
90
# File 'lib/kafka/ffi/message.rb', line 88

def opaque
  self[:private]
end

#partitionInteger

Returns the partition the message was published to.

Returns:

  • (Integer)

    Partition



61
62
63
# File 'lib/kafka/ffi/message.rb', line 61

def partition
  self[:partition]
end

#payloadString

Returns the message’s payload. When error != nil, will contain a string describing the error.

Returns:

  • (String)

    Message payload or error string.



96
97
98
99
100
101
102
# File 'lib/kafka/ffi/message.rb', line 96

def payload
  if self[:payload].null?
    return nil
  end

  self[:payload].read_string(self[:len])
end

#set_headers(headers) ⇒ Object Also known as: headers=

Note:

The Message takes ownership of the headers and they will be destroyed automatically with the Message.

Replace the Message’s headers with a new set.



167
168
169
# File 'lib/kafka/ffi/message.rb', line 167

def set_headers(headers)
  ::Kafka::FFI.rd_kafka_set_headers(self, headers)
end

#timestampInteger?

Retrieve the timestamp for a consumed message.

Examples:

Convert timestamp to a Time

ts = message.timestamp
ts = ts && Time.at(0, ts, :millisecond).utc

Returns:

  • (Integer)

    Message timestamp as milliseconds since unix epoch

  • (nil)

    Timestamp is not available



181
182
183
184
185
# File 'lib/kafka/ffi/message.rb', line 181

def timestamp
  # @todo: Type (second param) [rd_kafka_timestamp_type_t enum]
  ts = ::Kafka::FFI.rd_kafka_message_timestamp(self, nil)
  ts == -1 ? nil : ts
end

#topicnil, String

Returns the name of the Topic the Message was published to.

Returns:

  • (nil)

    Topic information was not available

  • (String)

    Name of the Topic the message was published to.



36
37
38
39
40
41
42
# File 'lib/kafka/ffi/message.rb', line 36

def topic
  if self[:rkt].nil?
    return nil
  end

  self[:rkt].name
end