Class: ShopifyCLI::Theme::Syncer::Uploader::BulkJob
- Inherits:
-
ShopifyCLI::ThreadPool::Job
- Object
- ShopifyCLI::ThreadPool::Job
- ShopifyCLI::Theme::Syncer::Uploader::BulkJob
- Defined in:
- lib/shopify_cli/theme/syncer/uploader/bulk_job.rb
Constant Summary collapse
- INTERVAL =
200ms
0.2
- MAX_RETRIES =
10
Instance Attribute Summary collapse
-
#admin_api ⇒ Object
readonly
Returns the value of attribute admin_api.
-
#bulk ⇒ Object
readonly
Returns the value of attribute bulk.
-
#ctx ⇒ Object
readonly
Returns the value of attribute ctx.
Attributes inherited from ShopifyCLI::ThreadPool::Job
Instance Method Summary collapse
-
#initialize(ctx, bulk) ⇒ BulkJob
constructor
A new instance of BulkJob.
- #perform! ⇒ Object
Methods inherited from ShopifyCLI::ThreadPool::Job
#call, #error?, #recurring?, #success?
Constructor Details
#initialize(ctx, bulk) ⇒ BulkJob
Returns a new instance of BulkJob.
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk_job.rb', line 17 def initialize(ctx, bulk) super(INTERVAL) @ctx = ctx @bulk = bulk @admin_api = bulk.admin_api # Mutex used to coordinate changes in the bulk items @bulk_item_mutex = Mutex.new # Mutex used to coordinate changes in the bulk items @bulk_item_mutex = Mutex.new end |
Instance Attribute Details
#admin_api ⇒ Object (readonly)
Returns the value of attribute admin_api.
15 16 17 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk_job.rb', line 15 def admin_api @admin_api end |
#bulk ⇒ Object (readonly)
Returns the value of attribute bulk.
15 16 17 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk_job.rb', line 15 def bulk @bulk end |
#ctx ⇒ Object (readonly)
Returns the value of attribute ctx.
15 16 17 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk_job.rb', line 15 def ctx @ctx end |
Instance Method Details
#perform! ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk_job.rb', line 31 def perform! bulk.wait_for_backoff! # Fetch bulk items bulk_items, bulk_size = bulk.consume_bulk_items return if bulk_items.empty? # Perform bulk request log("job request: size=#{bulk_items.size}, bytesize=#{bulk_size}") bulk_status, bulk_body, response = rest_request(bulk_items) bulk.backoff_if_near_limit!(response) log("job response: http_status=#{bulk_status}") # Abort execution when a fatal error happens return stable_flag_suggestion! if bulk_status != 207 # Handle item reponses responses = parse_responses(bulk_body) responses .each_with_index do |tuple, index| status, body = tuple bulk_item = bulk_items[index] handle_item_response(bulk_item, status, body, response) end ensure bulk.clean_in_progress_items(bulk_items) end |