Module: Dag::Client::API::Job
Instance Method Summary collapse
- #query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '') ⇒ Object
- #query_cancel(job_id) ⇒ Object
- #query_info(job_id) ⇒ Object
- #query_info_list(options = {}) ⇒ Object
- #query_log(job_id) ⇒ Object
Methods included from ListParams
Instance Method Details
#query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '') ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/dag/client/api/job.rb', line 81 def query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '') raise Dag::Client::ParameterInvalid.new('query is blank') if query.blank? raise Dag::Client::ParameterInvalid.new('query should start with SELECT') if query !~ /\ASELECT/i raise Dag::Client::ParameterInvalid.new('query should not include OVERWRITE') if query =~ /OVERWRITE/i if output_format && !['csv', 'tsv'].include?(output_format) raise Dag::Client::ParameterInvalid.new('ouput_format should be csv or tsv') end raise Dag::Client::ParameterInvalid.new('output_resource_path is blank') if output_resource_path.blank? unless output_resource_path.start_with?('dag://') raise Dag::Client::ParameterInvalid.new("output_resource_path should start with 'dag://'") end unless output_resource_path.end_with?('/') raise Dag::Client::ParameterInvalid.new("output_resource_path should end with '/'") end raise Dag::Client::ParameterInvalid.new('cluster_name is blank') if cluster_name.blank? parameters = { 'outputFormat' => output_format || 'csv', 'outputResourcePath' => output_resource_path, 'query' => query, 'clusterName' => cluster_name, 'label' => label, } resource = "/v1/" execute(RestParameter.new(:post, resource, cano_resource: 'select', content_type: 'application/json', parameters: parameters)) end |
#query_cancel(job_id) ⇒ Object
76 77 78 79 |
# File 'lib/dag/client/api/job.rb', line 76 def query_cancel(job_id) resource = "/v1/#{job_id}" execute(RestParameter.new(:delete, resource, cano_resource: 'query', content_type: 'application/json')) end |
#query_info(job_id) ⇒ Object
56 57 58 59 |
# File 'lib/dag/client/api/job.rb', line 56 def query_info(job_id) resource = "/v1/#{job_id}" execute(RestParameter.new(:get, resource, cano_resource: 'query')) end |
#query_info_list(options = {}) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/dag/client/api/job.rb', line 6 def query_info_list( = {}) resource = "/v1/" query_params = list_params() status = [:status] if status statuses = status.split(',') if statuses.any? { |s| !['running', 'finished', 'canceled', 'error'].include?(s) } raise Dag::Client::ParameterInvalid.new("status is invalid: #{status}") end query_params = query_params.merge('status' => status) end type = [:type] if type unless ['select', 'split'].include?(type.to_s) raise Dag::Client::ParameterInvalid.new("type is invalid: #{type}") end query_params = query_params.merge('type' => type) end cluster_name = [:cluster_name] if cluster_name query_params = query_params.merge('clusterName' => cluster_name) end label_prefix = [:label_prefix] if label_prefix query_params = query_params.merge('labelPrefix' => label_prefix) end cluster_rebooted = [:cluster_rebooted] unless cluster_rebooted.nil? unless [TrueClass, FalseClass].any? { |c| cluster_rebooted.kind_of?(c) } raise Dag::Client::ParameterInvalid.new("cluster_rebooted is invalid: #{cluster_rebooted}") end query_params = query_params.merge('clusterRebooted' => cluster_rebooted) end order = [:order] if order unless ['asc', 'desc'].include?(order) raise Dag::Client::ParameterInvalid.new("order is invalid: #{order}") end query_params = query_params.merge('order' => order) end execute(RestParameter.new(:get, resource, cano_resource: 'query', query_params: query_params)) end |
#query_log(job_id) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/dag/client/api/job.rb', line 61 def query_log(job_id) resource = "/v1/#{job_id}/log" log_info = execute(RestParameter.new(:get, resource, cano_resource: 'query')) if log_info.present? io = StringIO.new('', 'r+') log_info['log'].each_line do |line| io.puts line unless line.include?('CLIService') end return { "log" => io.string } end log_info end |