Module: Dag::Client::API::Job

Includes:
ListParams
Included in:
Dag::Client::API
Defined in:
lib/dag/client/api/job.rb

Instance Method Summary collapse

Methods included from ListParams

#list_params

Instance Method Details

#query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '') ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/dag/client/api/job.rb', line 81

def query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '')
  raise Dag::Client::ParameterInvalid.new('query is blank') if query.blank?

  raise Dag::Client::ParameterInvalid.new('query should start with SELECT') if query !~ /\ASELECT/i
  raise Dag::Client::ParameterInvalid.new('query should not include OVERWRITE') if query =~ /OVERWRITE/i

  if output_format && !['csv', 'tsv'].include?(output_format)
    raise Dag::Client::ParameterInvalid.new('ouput_format should be csv or tsv')
  end

  raise Dag::Client::ParameterInvalid.new('output_resource_path is blank') if output_resource_path.blank?

  unless output_resource_path.start_with?('dag://')
    raise Dag::Client::ParameterInvalid.new("output_resource_path should start with 'dag://'")
  end
  unless output_resource_path.end_with?('/')
    raise Dag::Client::ParameterInvalid.new("output_resource_path should end with '/'")
  end

  raise Dag::Client::ParameterInvalid.new('cluster_name is blank') if cluster_name.blank?


  parameters = {
    'outputFormat' => output_format || 'csv',
    'outputResourcePath' => output_resource_path,
    'query' => query,
    'clusterName' => cluster_name,
    'label' => label,
  }

  resource = "/v1/"
  execute(RestParameter.new(:post, resource, cano_resource: 'select', content_type: 'application/json', parameters: parameters))
end

#query_cancel(job_id) ⇒ Object



76
77
78
79
# File 'lib/dag/client/api/job.rb', line 76

def query_cancel(job_id)
  resource = "/v1/#{job_id}"
  execute(RestParameter.new(:delete, resource, cano_resource: 'query', content_type: 'application/json'))
end

#query_info(job_id) ⇒ Object



56
57
58
59
# File 'lib/dag/client/api/job.rb', line 56

def query_info(job_id)
  resource = "/v1/#{job_id}"
  execute(RestParameter.new(:get, resource, cano_resource: 'query'))
end

#query_info_list(options = {}) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/dag/client/api/job.rb', line 6

def query_info_list(options = {})
  resource = "/v1/"
  query_params = list_params(options)

  status = options[:status]
  if status
    statuses = status.split(',')
    if statuses.any? { |s| !['running', 'finished', 'canceled', 'error'].include?(s) }
      raise Dag::Client::ParameterInvalid.new("status is invalid: #{status}")
    end
    query_params = query_params.merge('status' => status)
  end

  type = options[:type]
  if type
    unless ['select', 'split'].include?(type.to_s)
      raise Dag::Client::ParameterInvalid.new("type is invalid: #{type}")
    end
    query_params = query_params.merge('type' => type)
  end

  cluster_name = options[:cluster_name]
  if cluster_name
    query_params = query_params.merge('clusterName' => cluster_name)
  end

  label_prefix = options[:label_prefix]
  if label_prefix
    query_params = query_params.merge('labelPrefix' => label_prefix)
  end

  cluster_rebooted = options[:cluster_rebooted]
  unless cluster_rebooted.nil?
    unless [TrueClass, FalseClass].any? { |c| cluster_rebooted.kind_of?(c) }
      raise Dag::Client::ParameterInvalid.new("cluster_rebooted is invalid: #{cluster_rebooted}")
    end
    query_params = query_params.merge('clusterRebooted' => cluster_rebooted)
  end

  order = options[:order]
  if order
    unless ['asc', 'desc'].include?(order)
      raise Dag::Client::ParameterInvalid.new("order is invalid: #{order}")
    end
    query_params = query_params.merge('order' => order)
  end

  execute(RestParameter.new(:get, resource, cano_resource: 'query', query_params: query_params))
end

#query_log(job_id) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/dag/client/api/job.rb', line 61

def query_log(job_id)
  resource = "/v1/#{job_id}/log"
  log_info = execute(RestParameter.new(:get, resource, cano_resource: 'query'))

  if log_info.present?
    io = StringIO.new('', 'r+')
    log_info['log'].each_line do |line|
      io.puts line unless line.include?('CLIService')
    end
    return { "log" => io.string }
  end

  log_info
end