Method: Kafka::Connection#send_request

Defined in:
lib/kafka/connection.rb

#send_request(request) ⇒ Object

Sends a request over the connection.

Parameters:

  • the request that should be encoded and written.

Returns:

  • the response.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/kafka/connection.rb', line 86

def send_request(request)
  api_name = Protocol.api_name(request.api_key)

  # Default notification payload.
  notification = {
    broker_host: @host,
    api: api_name,
    request_size: 0,
    response_size: 0,
  }

  raise IdleConnection if idle?

  @logger.push_tags(api_name)
  @instrumenter.instrument("request.connection", notification) do
    open unless open?

    @correlation_id += 1

    @logger.debug "Sending #{api_name} API request #{@correlation_id} to #{to_s}"

    write_request(request, notification)

    response_class = request.response_class
    response = wait_for_response(response_class, notification) unless response_class.nil?

    @last_request = Time.now

    response
  end
rescue SystemCallError, EOFError, IOError => e
  close

  raise ConnectionError, "Connection error #{e.class}: #{e}"
ensure
  @logger.pop_tags
end