Module: TreasureData::API::Job

Included in:
TreasureData::API
Defined in:
lib/td/client/api/job.rb

Defined Under Namespace

Classes: HTTPServerException, NullInflate

Instance Method Summary collapse

Instance Method Details

#hive_query(q, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {}) ⇒ String

Returns job_id.

Parameters:

  • q (String)
  • db (String) (defaults to: nil)
  • result_url (String) (defaults to: nil)
  • priority (Fixnum) (defaults to: nil)
  • opts (Hash) (defaults to: {})

Returns:

  • (String)

    job_id



221
222
223
# File 'lib/td/client/api/job.rb', line 221

def hive_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
  query(q, :hive, db, result_url, priority, retry_limit, opts)
end

#job_result(job_id) ⇒ Array

Parameters:

  • job_id (String)

Returns:

  • (Array)


123
124
125
126
127
128
129
130
131
132
# File 'lib/td/client/api/job.rb', line 123

def job_result(job_id)
  result = []
  unpacker = MessagePack::Unpacker.new
  job_result_download(job_id) do |chunk|
    unpacker.feed_each(chunk) do |row|
      result << row
    end unless chunk.empty?
  end
  return result
end

#job_result_each(job_id, &block) ⇒ nil

block is optional and must accept 1 argument

Parameters:

  • job_id (String)
  • block (Proc)

Returns:

  • (nil)


162
163
164
165
166
167
168
169
# File 'lib/td/client/api/job.rb', line 162

def job_result_each(job_id, &block)
  upkr = MessagePack::Unpacker.new
  # default to decompressing the response since format is fixed to 'msgpack'
  job_result_download(job_id) do |chunk|
    upkr.feed_each(chunk, &block) unless chunk.empty?
  end
  nil
end

#job_result_each_with_compr_size(job_id) ⇒ nil

block is optional and must accept 1 argument

Parameters:

  • job_id (String)
  • block (Proc)

Returns:

  • (nil)


176
177
178
179
180
181
182
183
184
185
# File 'lib/td/client/api/job.rb', line 176

def job_result_each_with_compr_size(job_id)
  upkr = MessagePack::Unpacker.new
  # default to decompressing the response since format is fixed to 'msgpack'
  job_result_download(job_id) do |chunk, total|
    upkr.feed_each(chunk) do |unpacked|
      yield unpacked, total if block_given?
    end unless chunk.empty?
  end
  nil
end

#job_result_format(job_id, format, io = nil) ⇒ nil, String

block is optional and must accept 1 parameter

Parameters:

  • job_id (String)
  • format (String)
  • io (IO) (defaults to: nil)
  • block (Proc)

Returns:

  • (nil, String)


141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/td/client/api/job.rb', line 141

def job_result_format(job_id, format, io=nil)
  if io
    job_result_download(job_id, format) do |chunk, total|
      io.write chunk
      yield total if block_given?
    end
    nil
  else
    body = String.new
    job_result_download(job_id, format) do |chunk|
      body << chunk
    end
    body
  end
end

#job_result_raw(job_id, format, io = nil) ⇒ String

Parameters:

  • job_id (String)
  • format (String)

Returns:

  • (String)


190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/td/client/api/job.rb', line 190

def job_result_raw(job_id, format, io = nil)
  body = io ? nil : String.new
  job_result_download(job_id, format, false) do |chunk, total|
    if io
      io.write(chunk)
      yield total if block_given?
    else
      body << chunk
    end
  end
  body
end

#job_status(job_id) ⇒ String

Returns HTTP status.

Parameters:

  • job_id (String)

Returns:

  • (String)

    HTTP status



111
112
113
114
115
116
117
118
119
# File 'lib/td/client/api/job.rb', line 111

def job_status(job_id)
  code, body, res = get("/v3/job/status/#{e job_id}")
  if code != "200"
    raise_error("Get job status failed", res)
  end

  js = checked_json(body, %w[status])
  return js['status']
end

#kill(job_id) ⇒ String

Parameters:

  • job_id (String)

Returns:

  • (String)


205
206
207
208
209
210
211
212
213
# File 'lib/td/client/api/job.rb', line 205

def kill(job_id)
  code, body, res = post("/v3/job/kill/#{e job_id}")
  if code != "200"
    raise_error("Kill job failed", res)
  end
  js = checked_json(body, %w[])
  former_status = js['former_status']
  return former_status
end

#list_jobs(from = 0, to = nil, status = nil, conditions = nil) ⇒ Array

Parameters:

  • from (Fixnum) (defaults to: 0)
  • to (Fixnum) (defaults to: nil)

    (to is inclusive)

  • status (String) (defaults to: nil)
  • conditions (Hash) (defaults to: nil)

Returns:

  • (Array)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/td/client/api/job.rb', line 13

def list_jobs(from=0, to=nil, status=nil, conditions=nil)
  params = {}
  params['from'] = from.to_s if from
  params['to'] = to.to_s if to
  params['status'] = status.to_s if status
  params.merge!(conditions) if conditions
  code, body, res = get("/v3/job/list", params)
  if code != "200"
    raise_error("List jobs failed", res)
  end
  js = checked_json(body, %w[jobs])
  result = []
  js['jobs'].each {|m|
    job_id = m['job_id']
    type = (m['type'] || '?').to_sym
    database = m['database']
    status = m['status']
    query = m['query']
    start_at = m['start_at']
    end_at = m['end_at']
    cpu_time = m['cpu_time']
    result_size = m['result_size'] # compressed result size in msgpack.gz format
    result_url = m['result']
    priority = m['priority']
    retry_limit = m['retry_limit']
    duration = m['duration']
    num_records = m['num_records']
    result << [job_id, type, status, query, start_at, end_at, cpu_time,
               result_size, result_url, priority, retry_limit, nil, database,
               duration, num_records]
  }
  return result
end

#pig_query(q, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {}) ⇒ String

Returns job_id.

Parameters:

  • q (String)
  • db (String) (defaults to: nil)
  • result_url (String) (defaults to: nil)
  • priority (Fixnum) (defaults to: nil)
  • opts (Hash) (defaults to: {})

Returns:

  • (String)

    job_id



231
232
233
# File 'lib/td/client/api/job.rb', line 231

def pig_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
  query(q, :pig, db, result_url, priority, retry_limit, opts)
end

#query(q, type = :hive, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {}) ⇒ String

Returns job_id.

Parameters:

  • q (String)
  • type (Symbol) (defaults to: :hive)
  • db (String) (defaults to: nil)
  • result_url (String) (defaults to: nil)
  • priority (Fixnum) (defaults to: nil)
  • opts (Hash) (defaults to: {})

Returns:

  • (String)

    job_id



242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/td/client/api/job.rb', line 242

def query(q, type=:hive, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
  params = {'query' => q}.merge(opts)
  params['result'] = result_url if result_url
  params['priority'] = priority if priority
  params['retry_limit'] = retry_limit if retry_limit
  code, body, res = post("/v3/job/issue/#{type}/#{e db}", params)
  if code != "200"
    raise_error("Query failed", res)
  end
  js = checked_json(body, %w[job_id])
  return js['job_id'].to_s
end

#show_job(job_id) ⇒ Array

Parameters:

  • job_id (String)

Returns:

  • (Array)


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/td/client/api/job.rb', line 49

def show_job(job_id)
  # use v3/job/status instead of v3/job/show to poll finish of a job
  code, body, res = get("/v3/job/show/#{e job_id}")
  if code != "200"
    raise_error("Show job failed", res)
  end
  js = checked_json(body, %w[status])
  # TODO debug
  type = (js['type'] || '?').to_sym  # TODO
  database = js['database']
  query = js['query']
  status = js['status']
  debug = js['debug']
  url = js['url']
  start_at = js['start_at']
  end_at = js['end_at']
  cpu_time = js['cpu_time']
  result_size = js['result_size'] # compressed result size in msgpack.gz format
  num_records = js['num_records']
  duration = js['duration']
  result = js['result'] # result target URL
  linked_result_export_job_id = js['linked_result_export_job_id']
  result_export_target_job_id = js['result_export_target_job_id']
  hive_result_schema = (js['hive_result_schema'] || '')
  if hive_result_schema.empty?
    hive_result_schema = nil
  else
    begin
      hive_result_schema = JSON.parse(hive_result_schema)
    rescue JSON::ParserError => e
      # this is a workaround for a Known Limitation in the Pig Engine which does not set a default, auto-generated
      #   column name for anonymous columns (such as the ones that are generated from UDF like COUNT or SUM).
      # The schema will contain 'nil' for the name of those columns and that breaks the JSON parser since it violates
      #   the JSON syntax standard.
      if type == :pig and hive_result_schema !~ /[\{\}]/
        begin
          # NOTE: this works because a JSON 2 dimensional array is the same as a Ruby one.
          #   Any change in the format for the hive_result_schema output may cause a syntax error, in which case
          #   this lame attempt at fixing the problem will fail and we will be raising the original JSON exception
          hive_result_schema = eval(hive_result_schema)
        rescue SyntaxError => ignored_e
          raise e
        end
        hive_result_schema.each_with_index {|col_schema, idx|
          if col_schema[0].nil?
            col_schema[0] = "_col#{idx}"
          end
        }
      else
        raise e
      end
    end
  end
  priority = js['priority']
  retry_limit = js['retry_limit']
  return [type, query, status, url, debug, start_at, end_at, cpu_time,
          result_size, result, hive_result_schema, priority, retry_limit, nil, database, duration, num_records,
          linked_result_export_job_id, result_export_target_job_id]
end