Class: ShopifyClient::BulkRequest::Operation

Inherits:
Object
  • Object
show all
Defined in:
lib/shopify-client/bulk_request.rb

Instance Method Summary collapse

Constructor Details

#initialize(client, id) ⇒ Operation

Returns a new instance of Operation.

Parameters:



22
23
24
25
# File 'lib/shopify-client/bulk_request.rb', line 22

def initialize(client, id)
  @client = client
  @id = id
end

Instance Method Details

#call(delay: 1) {|Enumerator<Hash>| ... } ⇒ Object

Wait for the operation to complete, then download the JSONL result data which is yielded as an Enumerator to the block. The data is streamed and parsed line by line to limit memory usage.

Parameters:

  • delay (Integer) (defaults to: 1)

    delay between polling requests in seconds

Yields:

  • (Enumerator<Hash>)

    yields each parsed line of JSONL

Raises:

  • CanceledOperationError

  • ExpiredOperationError

  • FailedOperationError

  • ObsoleteOperationError



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/shopify-client/bulk_request.rb', line 39

def call(delay: 1, &block)
  url = loop do
    status, url = poll

    case status
    when 'CANCELED'
      raise CanceledOperationError
    when 'EXPIRED'
      raise ExpiredOperationError
    when 'FAILED'
      raise FailedOperationError
    when 'COMPLETED'
      break url
    else
      sleep(delay)
    end
  end

  return if url.nil?

  file = Tempfile.new(mode: 0600)

  begin
    Faraday.get(url) do |request|
      request.options.on_data = ->(chunk, _) do
        file.write(chunk)
      end
    end
    file.rewind
    block.(Enumerator.new do |y|
      file.each_line { |line| y << JSON.parse(line) }
    end)
  ensure
    file.close
    file.unlink
  end
end

#cancelObject

Cancel the bulk operation.

Raises:

  • ObsoleteOperationError

  • TimeoutError



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/shopify-client/bulk_request.rb', line 81

def cancel
  begin
    @client.graphql(<<~QUERY)
      mutation {
        bulkOperationCancel(id: "#{@id}") {
          userErrors {
            field
            message
          }
        }
      }
    QUERY
  rescue Response::GraphQLClientError => e
    return if e.response.user_errors.message?([
      /cannot be canceled when it is completed/,
    ])

    raise e
  end

  poll_until(['CANCELED', 'COMPLETED'])
end

#poll_until(statuses, timeout: 60) ⇒ Object

Poll until operation status is met.

Parameters:

  • statuses (Array<String>)

    to terminate polling on

  • timeout (Integer) (defaults to: 60)

    in seconds

Raises:

  • ObsoleteOperationError

  • TimeoutError



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/shopify-client/bulk_request.rb', line 111

def poll_until(statuses, timeout: 60)
  Timeout.timeout(timeout) do
    loop do
      status, _ = poll

      break if statuses.any? { |expected_status| status == expected_status }
    end
  end
rescue Timeout::Error
  raise TimeoutError, 'exceeded %s seconds polling for status %s' % [
    timeout,
    statuses.join(', '),
  ]
end