Class: Presto::Client::StatementClient
- Inherits:
-
Object
- Object
- Presto::Client::StatementClient
- Defined in:
- lib/presto/client/statement_client.rb
Constant Summary collapse
- JSON_OPTIONS =
Presto can return too deep nested JSON
{ :max_nesting => false }
Instance Attribute Summary collapse
-
#exception ⇒ Object
readonly
Returns the value of attribute exception.
-
#query ⇒ Object
readonly
Returns the value of attribute query.
Instance Method Summary collapse
- #advance ⇒ Object
- #cancel_leaf_stage ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #current_results ⇒ Object
- #current_results_headers ⇒ Object
- #debug? ⇒ Boolean
- #exception? ⇒ Boolean
- #faraday_get_with_retry(uri, &block) ⇒ Object
- #has_next? ⇒ Boolean
-
#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient
constructor
A new instance of StatementClient.
- #query_failed? ⇒ Boolean
- #query_info ⇒ Object
- #query_succeeded? ⇒ Boolean
- #raise_if_timeout! ⇒ Object
- #raise_timeout_error! ⇒ Object
Constructor Details
#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient
Returns a new instance of StatementClient.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/presto/client/statement_client.rb', line 29 def initialize(faraday, query, , next_uri=nil) @faraday = faraday @options = @query = query @closed = false @exception = nil @retry_timeout = [:retry_timeout] || 120 if model_version = @options[:model_version] @models = ModelVersions.const_get("V#{model_version.gsub(".", "_")}") else @models = Models end @plan_timeout = [:plan_timeout] @query_timeout = [:query_timeout] if @plan_timeout || @query_timeout # this is set before the first call of faraday_get_with_retry so that # resuming StatementClient with next_uri is also under timeout control. @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end if next_uri response = faraday_get_with_retry(next_uri) @results_headers = response.headers @results = @models::QueryResults.decode(parse_body(response)) else post_query_request! end end |
Instance Attribute Details
#exception ⇒ Object (readonly)
Returns the value of attribute exception.
98 99 100 |
# File 'lib/presto/client/statement_client.rb', line 98 def exception @exception end |
#query ⇒ Object (readonly)
Returns the value of attribute query.
88 89 90 |
# File 'lib/presto/client/statement_client.rb', line 88 def query @query end |
Instance Method Details
#advance ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/presto/client/statement_client.rb', line 124 def advance if closed? || !has_next? return false end uri = @results.next_uri response = faraday_get_with_retry(uri) @results_headers = response.headers @results = decode_model(uri, parse_body(response), @models::QueryResults) raise_if_timeout! return true end |
#cancel_leaf_stage ⇒ Object
243 244 245 246 247 248 249 |
# File 'lib/presto/client/statement_client.rb', line 243 def cancel_leaf_stage if uri = @results.partial_cancel_uri @faraday.delete do |req| req.url uri end end end |
#close ⇒ Object
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/presto/client/statement_client.rb', line 251 def close return if @closed begin if uri = @results.next_uri @faraday.delete do |req| req.url uri end end rescue => e end @closed = true nil end |
#closed? ⇒ Boolean
94 95 96 |
# File 'lib/presto/client/statement_client.rb', line 94 def closed? @closed end |
#current_results ⇒ Object
112 113 114 |
# File 'lib/presto/client/statement_client.rb', line 112 def current_results @results end |
#current_results_headers ⇒ Object
116 117 118 |
# File 'lib/presto/client/statement_client.rb', line 116 def current_results_headers @results_headers end |
#debug? ⇒ Boolean
90 91 92 |
# File 'lib/presto/client/statement_client.rb', line 90 def debug? !!@options[:debug] end |
#exception? ⇒ Boolean
100 101 102 |
# File 'lib/presto/client/statement_client.rb', line 100 def exception? @exception end |
#faraday_get_with_retry(uri, &block) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/presto/client/statement_client.rb', line 176 def faraday_get_with_retry(uri, &block) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) attempts = 0 begin begin response = @faraday.get(uri) rescue Faraday::Error::TimeoutError, Faraday::Error::ConnectionFailed # temporally error to retry response = nil rescue => e @exception = e raise @exception end if response if response.status == 200 && !response.body.to_s.empty? return response end if response.status != 503 # retry only if 503 Service Unavailable # deterministic error @exception = PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}") raise @exception end end raise_if_timeout! attempts += 1 sleep attempts * 0.1 end while (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) < @retry_timeout && !@closed @exception = PrestoHttpError.new(408, "Presto API error due to timeout") raise @exception end |
#has_next? ⇒ Boolean
120 121 122 |
# File 'lib/presto/client/statement_client.rb', line 120 def has_next? !!@results.next_uri end |
#query_failed? ⇒ Boolean
104 105 106 |
# File 'lib/presto/client/statement_client.rb', line 104 def query_failed? @results.error != nil end |
#query_info ⇒ Object
139 140 141 142 143 |
# File 'lib/presto/client/statement_client.rb', line 139 def query_info uri = "/v1/query/#{@results.id}" response = faraday_get_with_retry(uri) decode_model(uri, parse_body(response), @models::QueryInfo) end |
#query_succeeded? ⇒ Boolean
108 109 110 |
# File 'lib/presto/client/statement_client.rb', line 108 def query_succeeded? @results.error == nil && !@exception && !@closed end |
#raise_if_timeout! ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/presto/client/statement_client.rb', line 213 def raise_if_timeout! if @started_at if @results && @results.next_uri == nil # query is already done return end elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at if @query_timeout && elapsed > @query_timeout raise_timeout_error! end if @plan_timeout && (@results == nil || @results.columns == nil) && elapsed > @plan_timeout # @results is not set (even first faraday_get_with_retry isn't called yet) or # result from Presto doesn't include result schema. Query planning isn't done yet. raise_timeout_error! end end end |
#raise_timeout_error! ⇒ Object
235 236 237 238 239 240 241 |
# File 'lib/presto/client/statement_client.rb', line 235 def raise_timeout_error! if query_id = @results && @results.id raise PrestoQueryTimeoutError, "Query #{query_id} timed out" else raise PrestoQueryTimeoutError, "Query timed out" end end |