Class: Presto::Client::StatementClient

Inherits:
Object
  • Object
show all
Defined in:
lib/presto/client/statement_client.rb

Constant Summary collapse

JSON_OPTIONS =

Presto can return too deep nested JSON

{
    :max_nesting => false
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient

Returns a new instance of StatementClient.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/presto/client/statement_client.rb', line 29

def initialize(faraday, query, options, next_uri=nil)
  @faraday = faraday

  @options = options
  @query = query
  @closed = false
  @exception = nil
  @retry_timeout = options[:retry_timeout] || 120
  if model_version = @options[:model_version]
    @models = ModelVersions.const_get("V#{model_version.gsub(".", "_")}")
  else
    @models = Models
  end

  @plan_timeout = options[:plan_timeout]
  @query_timeout = options[:query_timeout]

  if @plan_timeout || @query_timeout
    # this is set before the first call of faraday_get_with_retry so that
    # resuming StatementClient with next_uri is also under timeout control.
    @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  if next_uri
    response = faraday_get_with_retry(next_uri)
    @results_headers = response.headers
    @results = @models::QueryResults.decode(parse_body(response))
  else
    post_query_request!
  end
end

Instance Attribute Details

#exceptionObject (readonly)

Returns the value of attribute exception.



98
99
100
# File 'lib/presto/client/statement_client.rb', line 98

def exception
  @exception
end

#queryObject (readonly)

Returns the value of attribute query.



88
89
90
# File 'lib/presto/client/statement_client.rb', line 88

def query
  @query
end

Instance Method Details

#advanceObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/presto/client/statement_client.rb', line 124

def advance
  if closed? || !has_next?
    return false
  end

  uri = @results.next_uri
  response = faraday_get_with_retry(uri)
  @results_headers = response.headers
  @results = decode_model(uri, parse_body(response), @models::QueryResults)

  raise_if_timeout!

  return true
end

#cancel_leaf_stageObject



243
244
245
246
247
248
249
# File 'lib/presto/client/statement_client.rb', line 243

def cancel_leaf_stage
  if uri = @results.partial_cancel_uri
    @faraday.delete do |req|
      req.url uri
    end
  end
end

#closeObject



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/presto/client/statement_client.rb', line 251

def close
  return if @closed

  begin
    if uri = @results.next_uri
      @faraday.delete do |req|
        req.url uri
      end
    end
  rescue => e
  end

  @closed = true
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/presto/client/statement_client.rb', line 94

def closed?
  @closed
end

#current_resultsObject



112
113
114
# File 'lib/presto/client/statement_client.rb', line 112

def current_results
  @results
end

#current_results_headersObject



116
117
118
# File 'lib/presto/client/statement_client.rb', line 116

def current_results_headers
  @results_headers
end

#debug?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/presto/client/statement_client.rb', line 90

def debug?
  !!@options[:debug]
end

#exception?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/presto/client/statement_client.rb', line 100

def exception?
  @exception
end

#faraday_get_with_retry(uri, &block) ⇒ Object

Raises:



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/presto/client/statement_client.rb', line 176

def faraday_get_with_retry(uri, &block)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  attempts = 0

  begin
    begin
      response = @faraday.get(uri)
    rescue Faraday::Error::TimeoutError, Faraday::Error::ConnectionFailed
      # temporally error to retry
      response = nil
    rescue => e
      @exception = e
      raise @exception
    end

    if response
      if response.status == 200 && !response.body.to_s.empty?
        return response
      end

      if response.status != 503  # retry only if 503 Service Unavailable
        # deterministic error
        @exception = PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}")
        raise @exception
      end
    end

    raise_if_timeout!

    attempts += 1
    sleep attempts * 0.1
  end while (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) < @retry_timeout && !@closed

  @exception = PrestoHttpError.new(408, "Presto API error due to timeout")
  raise @exception
end

#has_next?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/presto/client/statement_client.rb', line 120

def has_next?
  !!@results.next_uri
end

#query_failed?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/presto/client/statement_client.rb', line 104

def query_failed?
  @results.error != nil
end

#query_infoObject



139
140
141
142
143
# File 'lib/presto/client/statement_client.rb', line 139

def query_info
  uri = "/v1/query/#{@results.id}"
  response = faraday_get_with_retry(uri)
  decode_model(uri, parse_body(response), @models::QueryInfo)
end

#query_succeeded?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/presto/client/statement_client.rb', line 108

def query_succeeded?
  @results.error == nil && !@exception && !@closed
end

#raise_if_timeout!Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/presto/client/statement_client.rb', line 213

def raise_if_timeout!
  if @started_at
    if @results && @results.next_uri == nil
      # query is already done
      return
    end

    elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at

    if @query_timeout && elapsed > @query_timeout
      raise_timeout_error!
    end

    if @plan_timeout && (@results == nil || @results.columns == nil) &&
        elapsed > @plan_timeout
      # @results is not set (even first faraday_get_with_retry isn't called yet) or
      # result from Presto doesn't include result schema. Query planning isn't done yet.
      raise_timeout_error!
    end
  end
end

#raise_timeout_error!Object



235
236
237
238
239
240
241
# File 'lib/presto/client/statement_client.rb', line 235

def raise_timeout_error!
  if query_id = @results && @results.id
    raise PrestoQueryTimeoutError, "Query #{query_id} timed out"
  else
    raise PrestoQueryTimeoutError, "Query timed out"
  end
end