Class: ShopifyCLI::Theme::Syncer::Uploader::Bulk
- Inherits:
-
Object
- Object
- ShopifyCLI::Theme::Syncer::Uploader::Bulk
- Includes:
- BackoffHelper
- Defined in:
- lib/shopify_cli/theme/syncer/uploader/bulk.rb
Constant Summary collapse
- MAX_BULK_BYTESIZE =
100KB
102_400
- MAX_BULK_FILES =
10 files
10
- SHUTDOWN_TIMEOUT =
20 seconds
20
- SHUTDOWN_RETRY_INTERVAL =
200 milliseconds
0.2
Instance Attribute Summary collapse
-
#admin_api ⇒ Object
readonly
Returns the value of attribute admin_api.
-
#ctx ⇒ Object
readonly
Returns the value of attribute ctx.
-
#theme ⇒ Object
readonly
Returns the value of attribute theme.
Instance Method Summary collapse
- #clean_in_progress_items(items) ⇒ Object
- #consume_bulk_items ⇒ Object
- #enqueue(bulk_item) ⇒ Object
-
#initialize(ctx, theme, admin_api, pool_size: 15) ⇒ Bulk
constructor
A new instance of Bulk.
- #remaining_items ⇒ Object
- #shutdown ⇒ Object
Methods included from BackoffHelper
#backingoff?, #backoff!, #backoff_if_near_limit!, #backoff_mutex, #initialize_backoff_helper!, #wait_for_backoff!
Constructor Details
#initialize(ctx, theme, admin_api, pool_size: 15) ⇒ Bulk
Returns a new instance of Bulk.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 22 def initialize(ctx, theme, admin_api, pool_size: 15) @ctx = ctx @theme = theme @admin_api = admin_api @pending_items = [] @in_progress_items = [] # Mutex used to coordinate changes in the `@pending_items` shared # accross the `@thread_pool` threads @pending_items_mutex = Mutex.new # Mutex used to coordinate changes in the `@in_progress_items` # shared accross the `@thread_pool` threads @in_progress_items_mutex = Mutex.new # Initialize thread pool and the jobs @thread_pool = initialize_thread_pool!(pool_size) # Initialize backoff helper on main thread to pause all jobs when # requests are reaching API rate limits initialize_backoff_helper!(margin: pool_size, backoff_delay: 5) end |
Instance Attribute Details
#admin_api ⇒ Object (readonly)
Returns the value of attribute admin_api.
20 21 22 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 20 def admin_api @admin_api end |
#ctx ⇒ Object (readonly)
Returns the value of attribute ctx.
20 21 22 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 20 def ctx @ctx end |
#theme ⇒ Object (readonly)
Returns the value of attribute theme.
20 21 22 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 20 def theme @theme end |
Instance Method Details
#clean_in_progress_items(items) ⇒ Object
84 85 86 87 88 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 84 def clean_in_progress_items(items) @in_progress_items_mutex.synchronize do @in_progress_items -= items end end |
#consume_bulk_items ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 54 def consume_bulk_items items = [] items_bytesize = 0 @pending_items_mutex.synchronize do has_enough_items = false has_enough_bytesize = false until has_enough_items || has_enough_bytesize || @pending_items.empty? bulk_item = @pending_items.first has_enough_items = items.size + 1 > MAX_BULK_FILES has_enough_bytesize = items_bytesize + bulk_item.size > MAX_BULK_BYTESIZE break if items.any? && (has_enough_items || has_enough_bytesize) items << bulk_item items_bytesize += bulk_item.size @pending_items.shift end end @in_progress_items_mutex.synchronize do @in_progress_items += items end [items, items_bytesize] end |
#enqueue(bulk_item) ⇒ Object
45 46 47 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 45 def enqueue(bulk_item) @pending_items_mutex.synchronize { @pending_items << bulk_item } end |
#remaining_items ⇒ Object
90 91 92 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 90 def remaining_items @pending_items + @in_progress_items end |
#shutdown ⇒ Object
49 50 51 52 |
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 49 def shutdown wait_bulk_items @thread_pool.shutdown end |