Class: Kafka::FFI::Event

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/event.rb

Defined Under Namespace

Classes: LogMessage

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::OpaquePointer

Instance Method Details

#config_stringString?

Returns the configuration for the event or nil if the configuration property is not set or not applicable for the event type.

Events:

- RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH

Returns:

  • (String)

    Configuration string for the event

  • (nil)

    Property not set or not applicable.



101
102
103
# File 'lib/kafka/ffi/event.rb', line 101

def config_string
  ::Kafka::FFI.rd_kafka_event_string(self)
end

#destroyObject

TODO:

Is the applicaiton responsible for calling #destroy?

Destroy the event, releasing it’s resources back to the system.



207
208
209
210
211
212
213
# File 'lib/kafka/ffi/event.rb', line 207

def destroy
  # It is safe to call destroy even if the Event's pointer is NULL but it
  # doesn't do anything so might as well guard against it just in case.
  if !pointer.null?
    ::Kafka::FFI.rd_kafka_event_destroy(self)
  end
end

#errornil, Kafka::ResponseError

Returns the error code for the event or nil if there was no error.

Returns:

See Also:



111
112
113
114
115
116
# File 'lib/kafka/ffi/event.rb', line 111

def error
  err = ::Kafka::FFI.rd_kafka_event_error(self)
  if err != :ok
    ::Kafka::ResponseError.new(err, error_string)
  end
end

#error_is_fatalBoolean Also known as: error_is_fatal?

Returns true or false if the Event represents a fatal error.

Returns:

  • (Boolean)

    There is an error for the Event and it is fatal.



129
130
131
# File 'lib/kafka/ffi/event.rb', line 129

def error_is_fatal
  error && ::Kafka::FFI.rd_kafka_event_error_is_fatal(self)
end

#error_stringnil, String

Returns a description of the error or nil when there is no error.

Returns:

  • (nil)

    No error for the Event

  • (String)

    Description of the error



122
123
124
# File 'lib/kafka/ffi/event.rb', line 122

def error_string
  ::Kafka::FFI.rd_kafka_event_error_string(self)
end

#logEvent::LogMessage

Returns the log message attached to the event.

Events:

- RD_KAFKA_EVENT_LOG

Returns:



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/kafka/ffi/event.rb', line 140

def log
  facility = ::FFI::MemoryPointer.new(:pointer)
  message  = ::FFI::MemoryPointer.new(:pointer)
  level    = ::FFI::MemoryPointer.new(:pointer)

  exists = ::Kafka::FFI.rd_kafka_event_log(self, facility, message, level)
  if exists != 0
    # Event type does not support log messages.
    return nil
  end

  LogMessage.new(
    facility.read_pointer.read_string,
    message.read_pointer.read_string,
    level.read_int,
  )
ensure
  facility.free
  message.free
  level.free
end

#messages {|message| ... } ⇒ Array<Message>?

Note:

Do not call #destroy on the Messages

Retrieve the set of messages. Can take a block to iterate over the set of Messages rather than return them.

Events:

- RD_KAFKA_EVENT_FETCH
- RD_KAFKA_EVENT_DR

Yields:

  • (message)

    Iterate over available messages

Yield Parameters:

Returns:

  • (Array<Message>)

    Messages attached to the Event

  • (nil)

    Event does not have any Messages or a block was given.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/kafka/ffi/event.rb', line 48

def messages
  # This departs from the librdkafka API due to having a collection of
  # methods that have funky semantics for Ruby.

  # @todo Messages are only on RD_KAFKA_EVENT_FETCH and RD_KAFKA_EVENT_DR.
  #   Need to test what happens with other event types.

  # No block so fetch all of the messages and return them as an array.
  if !block_given?
    count = ::Kafka::FFI.rd_kafka_event_message_count(self)
    if count == 0
      return []
    end

    begin
      # Allocates enough memory for the full set but only converts as many
      # as were returned.
      # @todo Retrieve all until sum(ret) == count?
      ptr = ::FFI::MemoryPointer.new(:pointer, count)
      ret = ::Kafka::FFI.rd_kafka_event_message_array(self, ptr, count)

      # Map the return pointers to Messages
      return ptr.read_array_of_pointer(ret).map! { |p| Message.from_native(p) }
    ensure
      ptr.free
    end
  end

  # Block was passed so use rd_kafka_event_message_next
  begin
    ptr = ::FFI::MemoryPointer.new(:pointer)

    loop do
      msg = ::Kafka::FFI.rd_kafka_event_message_next(self)
      if msg.null?
        break
      end

      yield(msg)
    end
  ensure
    ptr.free
  end
end

#nameString

Returns the name of the event’s type.

Returns:

  • (String)

    Name of the type of event



30
31
32
# File 'lib/kafka/ffi/event.rb', line 30

def name
  ::Kafka::FFI.rd_kafka_event_name(self)
end

#statsnil, String

Extracts stats from the event

Events:

- RD_KAFKA_EVENT_STATS

Returns:

  • (nil)

    Event type does not support stats

  • (String)

    JSON encoded stats information.



169
170
171
172
173
174
175
176
177
# File 'lib/kafka/ffi/event.rb', line 169

def stats
  # Calling stats on an unsupported type causes a segfault with librdkafka
  # 1.3.0.
  if type != :stats
    return nil
  end

  ::Kafka::FFI.rd_kafka_event_stats(self)
end

#topic_partitionTopicPartition

Note:

The application MUST call #destroy on the TopicPartition when done.

Returns the topic partition from the Event.

Events:

- RD_KAFKA_EVENT_ERROR

Returns:



200
201
202
# File 'lib/kafka/ffi/event.rb', line 200

def topic_partition
  ::Kafka::FFI.rd_kafka_event_topic_partition(self)
end

#topic_partition_listTopicPartitionList

Note:

Application MUST NOT call #destroy on the list

Returns the topic partition list from the Event.

Events:

- RD_KAFKA_EVENT_REBALANCE
- RD_KAFKA_EVENT_OFFSET_COMMIT

Returns:



188
189
190
# File 'lib/kafka/ffi/event.rb', line 188

def topic_partition_list
  ::Kafka::FFI.rd_kafka_event_topic_partition_list(self)
end

#typeSymbol

Returns the event’s type

Returns:

  • (Symbol)

    Type of the event

See Also:

  • RD_KAFKA_EVENT_*


23
24
25
# File 'lib/kafka/ffi/event.rb', line 23

def type
  ::Kafka::FFI.rd_kafka_event_type(self)
end