Class: SalesforceChunker::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/salesforce_chunker/job.rb

Constant Summary collapse

QUERY_OPERATIONS =
["query", "queryAll"].freeze
DEFAULT_RETRY_SECONDS =
10
DEFAULT_TIMEOUT_SECONDS =
3600

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, object:, operation:, **options) ⇒ Job

Returns a new instance of Job.



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/salesforce_chunker/job.rb', line 11

def initialize(connection:, object:, operation:, **options)
  @log = options[:logger] || Logger.new(options[:log_output])
  @log.progname = "salesforce_chunker"

  @connection = connection
  @operation = operation
  @batches_count = nil

  @log.info "Creating Bulk API Job"
  @job_id = create_job(object, options.slice(:headers, :external_id))
end

Instance Attribute Details

#batches_countObject (readonly)

Returns the value of attribute batches_count.



5
6
7
# File 'lib/salesforce_chunker/job.rb', line 5

def batches_count
  @batches_count
end

Instance Method Details

#closeObject



107
108
109
110
# File 'lib/salesforce_chunker/job.rb', line 107

def close
  body = {"state": "Closed"}
  @connection.post_json("job/#{@job_id}/", body)
end

#create_batch(payload) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/salesforce_chunker/job.rb', line 81

def create_batch(payload)
  if QUERY_OPERATIONS.include?(@operation)
    @log.info "Creating #{@operation.capitalize} Batch: \"#{payload.gsub(/\n/, " ").strip}\""
    @connection.post("job/#{@job_id}/batch", payload.to_s)["id"]
  else
    @log.info "Creating #{@operation.capitalize} Batch"
    @connection.post_json("job/#{@job_id}/batch", payload)["id"]
  end
end

#download_results(**options) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/salesforce_chunker/job.rb', line 23

def download_results(**options)
  return nil unless QUERY_OPERATIONS.include?(@operation)
  return to_enum(:download_results, **options) unless block_given?

  retry_seconds = options[:retry_seconds] || DEFAULT_RETRY_SECONDS
  timeout_at = Time.now.utc + (options[:timeout_seconds] || DEFAULT_TIMEOUT_SECONDS)
  downloaded_batches = []

  loop do
    results_downloaded = false
    @log.info "Retrieving batch status information"
    get_completed_batches.each do |batch|
      next if downloaded_batches.include?(batch["id"])
      @log.info "Batch #{downloaded_batches.length + 1} of #{@batches_count || '?'}: " \
        "retrieving #{batch["numberRecordsProcessed"]} records"
      if batch["numberRecordsProcessed"].to_i > 0
        get_batch_results(batch["id"]) { |result| yield(result) }
        results_downloaded = true
      end
      downloaded_batches.append(batch["id"])
    end

    break if @batches_count && downloaded_batches.length == @batches_count

    unless results_downloaded
      raise TimeoutError, "Timeout during batch processing" if Time.now.utc > timeout_at

      @log.info "Waiting #{retry_seconds} seconds"
      sleep(retry_seconds)
    end
  end
  
  @log.info "Completed"
end

#get_batch_results(batch_id) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/salesforce_chunker/job.rb', line 66

def get_batch_results(batch_id)
  retrieve_batch_results(batch_id).each do |result_id|
    results = retrieve_raw_results(batch_id, result_id)

    @log.info "Parsing JSON response"
    parsed_results = JSON.parse(results)

    @log.info "Yielding records"
    parsed_results.each do |result|
      result.tap { |h| h.delete("attributes") }
      yield(result)
    end
  end
end

#get_batch_statusesObject



91
92
93
# File 'lib/salesforce_chunker/job.rb', line 91

def get_batch_statuses
  @connection.get_json("job/#{@job_id}/batch")["batchInfo"]
end

#get_completed_batchesObject



58
59
60
61
62
63
64
# File 'lib/salesforce_chunker/job.rb', line 58

def get_completed_batches
  get_batch_statuses.select do |batch|
    raise BatchError, "Batch failed: #{batch["stateMessage"]}" if batch["state"] == "Failed"
    raise RecordError, "Failed records in batch" if batch["state"] == "Completed" && batch["numberRecordsFailed"].to_i > 0
    batch["state"] == "Completed"
  end
end

#retrieve_batch_results(batch_id) ⇒ Object



95
96
97
# File 'lib/salesforce_chunker/job.rb', line 95

def retrieve_batch_results(batch_id)
  @connection.get_json("job/#{@job_id}/batch/#{batch_id}/result")
end

#retrieve_raw_results(batch_id, result_id) ⇒ Object



103
104
105
# File 'lib/salesforce_chunker/job.rb', line 103

def retrieve_raw_results(batch_id, result_id)
  @connection.get("job/#{@job_id}/batch/#{batch_id}/result/#{result_id}")
end

#retrieve_results(batch_id, result_id) ⇒ Object



99
100
101
# File 'lib/salesforce_chunker/job.rb', line 99

def retrieve_results(batch_id, result_id)
  @connection.get_json("job/#{@job_id}/batch/#{batch_id}/result/#{result_id}")
end