Class: ShopifyCLI::Theme::Syncer::Uploader::Bulk

Inherits:
Object
  • Object
show all
Includes:
BackoffHelper
Defined in:
lib/shopify_cli/theme/syncer/uploader/bulk.rb

Constant Summary collapse

MAX_BULK_BYTESIZE =

100KB

102_400
MAX_BULK_FILES =

10 files

10
SHUTDOWN_TIMEOUT =

20 seconds

20
SHUTDOWN_RETRY_INTERVAL =

200 milliseconds

0.2

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BackoffHelper

#backingoff?, #backoff!, #backoff_if_near_limit!, #backoff_mutex, #initialize_backoff_helper!, #wait_for_backoff!

Constructor Details

#initialize(ctx, theme, admin_api, pool_size: 15) ⇒ Bulk

Returns a new instance of Bulk.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 22

def initialize(ctx, theme, admin_api, pool_size: 15)
  @ctx = ctx
  @theme = theme
  @admin_api = admin_api
  @pending_items = []
  @in_progress_items = []

  # Mutex used to coordinate changes in the `@pending_items` shared
  # accross the `@thread_pool` threads
  @pending_items_mutex = Mutex.new

  # Mutex used to coordinate changes in the `@in_progress_items`
  # shared accross the `@thread_pool` threads
  @in_progress_items_mutex = Mutex.new

  # Initialize thread pool and the jobs
  @thread_pool = initialize_thread_pool!(pool_size)

  # Initialize backoff helper on main thread to pause all jobs when
  # requests are reaching API rate limits
  initialize_backoff_helper!(margin: pool_size, backoff_delay: 5)
end

Instance Attribute Details

#admin_apiObject (readonly)

Returns the value of attribute admin_api.



20
21
22
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 20

def admin_api
  @admin_api
end

#ctxObject (readonly)

Returns the value of attribute ctx.



20
21
22
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 20

def ctx
  @ctx
end

#themeObject (readonly)

Returns the value of attribute theme.



20
21
22
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 20

def theme
  @theme
end

Instance Method Details

#clean_in_progress_items(items) ⇒ Object



84
85
86
87
88
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 84

def clean_in_progress_items(items)
  @in_progress_items_mutex.synchronize do
    @in_progress_items -= items
  end
end

#consume_bulk_itemsObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 54

def consume_bulk_items
  items = []
  items_bytesize = 0

  @pending_items_mutex.synchronize do
    has_enough_items = false
    has_enough_bytesize = false

    until has_enough_items || has_enough_bytesize || @pending_items.empty?
      bulk_item = @pending_items.first

      has_enough_items = items.size + 1 > MAX_BULK_FILES
      has_enough_bytesize = items_bytesize + bulk_item.size > MAX_BULK_BYTESIZE

      break if items.any? && (has_enough_items || has_enough_bytesize)

      items << bulk_item
      items_bytesize += bulk_item.size

      @pending_items.shift
    end
  end

  @in_progress_items_mutex.synchronize do
    @in_progress_items += items
  end

  [items, items_bytesize]
end

#enqueue(bulk_item) ⇒ Object



45
46
47
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 45

def enqueue(bulk_item)
  @pending_items_mutex.synchronize { @pending_items << bulk_item }
end

#remaining_itemsObject



90
91
92
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 90

def remaining_items
  @pending_items + @in_progress_items
end

#shutdownObject



49
50
51
52
# File 'lib/shopify_cli/theme/syncer/uploader/bulk.rb', line 49

def shutdown
  wait_bulk_items
  @thread_pool.shutdown
end