Class: Presto::Client::StatementClient

Inherits:
Object
  • Object
show all
Defined in:
lib/presto/client/statement_client.rb

Constant Summary collapse

HEADERS =
{
  "User-Agent" => "presto-ruby/#{VERSION}",
}
HTTP11_SEPARATOR =
["(", ")", "<", ">", "@", ",", ";", ":", "\\", "<", ">", "/", "[", "]", "?", "=", "{", "}", " ", "\v"]
HTTP11_TOKEN_CHARSET =
(32..126).map {|x| x.chr } - HTTP11_SEPARATOR
HTTP11_TOKEN_REGEXP =
/^[#{Regexp.escape(HTTP11_TOKEN_CHARSET.join)}]+\z/
HTTP11_CTL_CHARSET =
(0..31).map {|x| x.chr } + [127.chr]
HTTP11_CTL_CHARSET_REGEXP =
/[#{Regexp.escape(HTTP11_CTL_CHARSET.join)}]/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(faraday, query, options) ⇒ StatementClient

Returns a new instance of StatementClient.



42
43
44
45
46
47
48
49
50
51
# File 'lib/presto/client/statement_client.rb', line 42

def initialize(faraday, query, options)
  @faraday = faraday
  @faraday.headers.merge!(HEADERS)

  @options = options
  @query = query
  @closed = false
  @exception = nil
  post_query_request!
end

Instance Attribute Details

#exceptionObject (readonly)

Returns the value of attribute exception.



111
112
113
# File 'lib/presto/client/statement_client.rb', line 111

def exception
  @exception
end

#queryObject (readonly)

Returns the value of attribute query.



101
102
103
# File 'lib/presto/client/statement_client.rb', line 101

def query
  @query
end

Instance Method Details

#advanceObject



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/presto/client/statement_client.rb', line 133

def advance
  if closed? || !has_next?
    return false
  end
  uri = @results.next_uri

  body = faraday_get_with_retry(uri)
  @results = Models::QueryResults.decode(MultiJson.load(body))

  return true
end

#cancel_leaf_stageObject



180
181
182
183
184
185
186
187
188
# File 'lib/presto/client/statement_client.rb', line 180

def cancel_leaf_stage
  if uri = @results.next_uri
    response = @faraday.delete do |req|
      req.url uri
    end
    return response.status / 100 == 2
  end
  return false
end

#closeObject



211
212
213
214
215
216
217
218
219
220
# File 'lib/presto/client/statement_client.rb', line 211

def close
  return if @closed

  # cancel running statement
  # TODO make async reqeust and ignore response?
  cancel_leaf_stage

  @closed = true
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/presto/client/statement_client.rb', line 107

def closed?
  @closed
end

#current_resultsObject



125
126
127
# File 'lib/presto/client/statement_client.rb', line 125

def current_results
  @results
end

#debug?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/presto/client/statement_client.rb', line 103

def debug?
  !!@options[:debug]
end

#encode_properties(properties) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/presto/client/statement_client.rb', line 196

def encode_properties(properties)
  # this is a hack to set same header multiple times.
  properties.map do |k, v|
    token = k.to_s
    field_value = v.to_s  # TODO LWS encoding is not implemented
    unless k =~ HTTP11_TOKEN_REGEXP
      raise Faraday::ClientError, "Key of properties can't include HTTP/1.1 control characters or separators (#{HTTP11_SEPARATOR.map {|c| c =~ /\s/ ? c.dump : c }.join(' ')})"
    end
    if field_value =~ HTTP11_CTL_CHARSET_REGEXP
      raise Faraday::ClientError, "Value of properties can't include HTTP/1.1 control characters"
    end
    "#{token}=#{field_value}"
  end.join("\r\n#{PrestoHeaders::PRESTO_SESSION}: ")
end

#exception?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/presto/client/statement_client.rb', line 113

def exception?
  @exception
end

#faraday_get_with_retry(uri, &block) ⇒ Object

Raises:



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/presto/client/statement_client.rb', line 150

def faraday_get_with_retry(uri, &block)
  start = Time.now
  attempts = 0

  begin
    begin
      response = @faraday.get(uri)
    rescue => e
      @exception = e
      raise @exception
    end

    if response.status == 200 && !response.body.to_s.empty?
      return response.body
    end

    if response.status != 503  # retry only if 503 Service Unavailable
      # deterministic error
      @exception = PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}")
      raise @exception
    end

    attempts += 1
    sleep attempts * 0.1
  end while (Time.now - start) < 2*60*60 && !@closed

  @exception = PrestoHttpError.new(408, "Presto API error due to timeout")
  raise @exception
end

#has_next?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/presto/client/statement_client.rb', line 129

def has_next?
  !!@results.next_uri
end

#query_failed?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/presto/client/statement_client.rb', line 117

def query_failed?
  @results.error != nil
end

#query_infoObject



145
146
147
148
# File 'lib/presto/client/statement_client.rb', line 145

def query_info
  body = faraday_get_with_retry("/v1/query/#{@results.id}")
  Models::QueryInfo.decode(MultiJson.load(body))
end

#query_succeeded?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/presto/client/statement_client.rb', line 121

def query_succeeded?
  @results.error == nil && !@exception && !@closed
end