Class: Kafka::Broker
- Inherits:
-
Object
- Object
- Kafka::Broker
- Defined in:
- lib/kafka/broker.rb
Instance Method Summary collapse
- #add_partitions_to_txn(**options) ⇒ Object
- #address_match?(host, port) ⇒ Boolean
- #alter_configs(**options) ⇒ Object
- #api_versions ⇒ Object
- #commit_offsets(**options) ⇒ Object
- #connected? ⇒ Boolean
- #create_partitions(**options) ⇒ Object
- #create_topics(**options) ⇒ Object
- #delete_topics(**options) ⇒ Object
- #describe_configs(**options) ⇒ Object
- #describe_groups(**options) ⇒ Object
- #disconnect ⇒ nil
- #end_txn(**options) ⇒ Object
-
#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse
Fetches messages from a specified topic and partition.
-
#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse
Fetches cluster metadata from the broker.
- #fetch_offsets(**options) ⇒ Object
- #find_coordinator(**options) ⇒ Object
- #heartbeat(**options) ⇒ Object
- #init_producer_id(**options) ⇒ Object
-
#initialize(connection_builder:, host:, port:, node_id: nil, logger:) ⇒ Broker
constructor
A new instance of Broker.
- #join_group(**options) ⇒ Object
- #leave_group(**options) ⇒ Object
- #list_groups ⇒ Object
-
#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse
Lists the offset of the specified topics and partitions.
-
#produce(**options) ⇒ Kafka::Protocol::ProduceResponse
Produces a set of messages to the broker.
- #sync_group(**options) ⇒ Object
- #to_s ⇒ String
Constructor Details
#initialize(connection_builder:, host:, port:, node_id: nil, logger:) ⇒ Broker
Returns a new instance of Broker.
9 10 11 12 13 14 15 16 |
# File 'lib/kafka/broker.rb', line 9 def initialize(connection_builder:, host:, port:, node_id: nil, logger:) @connection_builder = connection_builder @connection = nil @host = host @port = port @node_id = node_id @logger = TaggedLogger.new(logger) end |
Instance Method Details
#add_partitions_to_txn(**options) ⇒ Object
173 174 175 176 177 |
# File 'lib/kafka/broker.rb', line 173 def add_partitions_to_txn(**) request = Protocol::AddPartitionsToTxnRequest.new(**) send_request(request) end |
#address_match?(host, port) ⇒ Boolean
18 19 20 |
# File 'lib/kafka/broker.rb', line 18 def address_match?(host, port) host == @host && port == @port end |
#alter_configs(**options) ⇒ Object
137 138 139 140 141 |
# File 'lib/kafka/broker.rb', line 137 def alter_configs(**) request = Protocol::AlterConfigsRequest.new(**) send_request(request) end |
#api_versions ⇒ Object
155 156 157 158 159 |
# File 'lib/kafka/broker.rb', line 155 def api_versions request = Protocol::ApiVersionsRequest.new send_request(request) end |
#commit_offsets(**options) ⇒ Object
83 84 85 86 87 |
# File 'lib/kafka/broker.rb', line 83 def commit_offsets(**) request = Protocol::OffsetCommitRequest.new(**) send_request(request) end |
#connected? ⇒ Boolean
33 34 35 |
# File 'lib/kafka/broker.rb', line 33 def connected? !@connection.nil? end |
#create_partitions(**options) ⇒ Object
143 144 145 146 147 |
# File 'lib/kafka/broker.rb', line 143 def create_partitions(**) request = Protocol::CreatePartitionsRequest.new(**) send_request(request) end |
#create_topics(**options) ⇒ Object
119 120 121 122 123 |
# File 'lib/kafka/broker.rb', line 119 def create_topics(**) request = Protocol::CreateTopicsRequest.new(**) send_request(request) end |
#delete_topics(**options) ⇒ Object
125 126 127 128 129 |
# File 'lib/kafka/broker.rb', line 125 def delete_topics(**) request = Protocol::DeleteTopicsRequest.new(**) send_request(request) end |
#describe_configs(**options) ⇒ Object
131 132 133 134 135 |
# File 'lib/kafka/broker.rb', line 131 def describe_configs(**) request = Protocol::DescribeConfigsRequest.new(**) send_request(request) end |
#describe_groups(**options) ⇒ Object
161 162 163 164 165 |
# File 'lib/kafka/broker.rb', line 161 def describe_groups(**) request = Protocol::DescribeGroupsRequest.new(**) send_request(request) end |
#disconnect ⇒ nil
28 29 30 |
# File 'lib/kafka/broker.rb', line 28 def disconnect connection.close if connected? end |
#end_txn(**options) ⇒ Object
179 180 181 182 183 |
# File 'lib/kafka/broker.rb', line 179 def end_txn(**) request = Protocol::EndTxnRequest.new(**) send_request(request) end |
#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse
Fetches messages from a specified topic and partition.
51 52 53 54 55 |
# File 'lib/kafka/broker.rb', line 51 def (**) request = Protocol::FetchRequest.new(**) send_request(request) end |
#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse
Fetches cluster metadata from the broker.
41 42 43 44 45 |
# File 'lib/kafka/broker.rb', line 41 def (**) request = Protocol::MetadataRequest.new(**) send_request(request) end |
#fetch_offsets(**options) ⇒ Object
77 78 79 80 81 |
# File 'lib/kafka/broker.rb', line 77 def fetch_offsets(**) request = Protocol::OffsetFetchRequest.new(**) send_request(request) end |
#find_coordinator(**options) ⇒ Object
107 108 109 110 111 |
# File 'lib/kafka/broker.rb', line 107 def find_coordinator(**) request = Protocol::FindCoordinatorRequest.new(**) send_request(request) end |
#heartbeat(**options) ⇒ Object
113 114 115 116 117 |
# File 'lib/kafka/broker.rb', line 113 def heartbeat(**) request = Protocol::HeartbeatRequest.new(**) send_request(request) end |
#init_producer_id(**options) ⇒ Object
167 168 169 170 171 |
# File 'lib/kafka/broker.rb', line 167 def init_producer_id(**) request = Protocol::InitProducerIDRequest.new(**) send_request(request) end |
#join_group(**options) ⇒ Object
89 90 91 92 93 |
# File 'lib/kafka/broker.rb', line 89 def join_group(**) request = Protocol::JoinGroupRequest.new(**) send_request(request) end |
#leave_group(**options) ⇒ Object
101 102 103 104 105 |
# File 'lib/kafka/broker.rb', line 101 def leave_group(**) request = Protocol::LeaveGroupRequest.new(**) send_request(request) end |
#list_groups ⇒ Object
149 150 151 152 153 |
# File 'lib/kafka/broker.rb', line 149 def list_groups request = Protocol::ListGroupsRequest.new send_request(request) end |
#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse
Lists the offset of the specified topics and partitions.
61 62 63 64 65 |
# File 'lib/kafka/broker.rb', line 61 def list_offsets(**) request = Protocol::ListOffsetRequest.new(**) send_request(request) end |
#produce(**options) ⇒ Kafka::Protocol::ProduceResponse
Produces a set of messages to the broker.
71 72 73 74 75 |
# File 'lib/kafka/broker.rb', line 71 def produce(**) request = Protocol::ProduceRequest.new(**) send_request(request) end |
#sync_group(**options) ⇒ Object
95 96 97 98 99 |
# File 'lib/kafka/broker.rb', line 95 def sync_group(**) request = Protocol::SyncGroupRequest.new(**) send_request(request) end |
#to_s ⇒ String
23 24 25 |
# File 'lib/kafka/broker.rb', line 23 def to_s "#{@host}:#{@port} (node_id=#{@node_id.inspect})" end |