Module: R10K::ContentSynchronizer
- Defined in:
- lib/r10k/content_synchronizer.rb
Class Method Summary collapse
-
.concurrent_accept(modules, visitor, loader, pool_size, logger) ⇒ Object
Returns a Queue of the names of modules actually updated.
-
.concurrent_sync(modules, pool_size, logger) ⇒ Object
Returns a Queue of the names of modules actually updated.
- .enqueue_modules(queue, modules) ⇒ Object
- .modules_sync_queue(modules) ⇒ Object
- .modules_visit_queue(modules, visitor, loader) ⇒ Object
- .serial_accept(modules, visitor, loader) ⇒ Object
- .serial_sync(modules) ⇒ Object
-
.sync_queue(mods_queue, pool_size, logger) ⇒ Object
Returns a Queue of the names of modules actually updated.
- .sync_thread(mods_queue, logger, updated_modules) ⇒ Object
Class Method Details
.concurrent_accept(modules, visitor, loader, pool_size, logger) ⇒ Object
Returns a Queue of the names of modules actually updated
20 21 22 23 |
# File 'lib/r10k/content_synchronizer.rb', line 20 def self.concurrent_accept(modules, visitor, loader, pool_size, logger) mods_queue = modules_visit_queue(modules, visitor, loader) sync_queue(mods_queue, pool_size, logger) end |
.concurrent_sync(modules, pool_size, logger) ⇒ Object
Returns a Queue of the names of modules actually updated
26 27 28 29 |
# File 'lib/r10k/content_synchronizer.rb', line 26 def self.concurrent_sync(modules, pool_size, logger) mods_queue = modules_sync_queue(modules) sync_queue(mods_queue, pool_size, logger) end |
.enqueue_modules(queue, modules) ⇒ Object
69 70 71 72 73 74 75 |
# File 'lib/r10k/content_synchronizer.rb', line 69 def self.enqueue_modules(queue, modules) modules_by_cachedir = modules.group_by { |mod| mod.cachedir } modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || [] modules_without_vcs_cachedir.each {|mod| queue << Array(mod) } modules_by_cachedir.values.each {|mods| queue << mods } end |
.modules_sync_queue(modules) ⇒ Object
63 64 65 66 67 |
# File 'lib/r10k/content_synchronizer.rb', line 63 def self.modules_sync_queue(modules) Queue.new.tap do |queue| enqueue_modules(queue, modules) end end |
.modules_visit_queue(modules, visitor, loader) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/r10k/content_synchronizer.rb', line 55 def self.modules_visit_queue(modules, visitor, loader) Queue.new.tap do |queue| visitor.visit(:puppetfile, loader) do enqueue_modules(queue, modules) end end end |
.serial_accept(modules, visitor, loader) ⇒ Object
4 5 6 7 8 |
# File 'lib/r10k/content_synchronizer.rb', line 4 def self.serial_accept(modules, visitor, loader) visitor.visit(:puppetfile, loader) do serial_sync(modules) end end |
.serial_sync(modules) ⇒ Object
10 11 12 13 14 15 16 17 |
# File 'lib/r10k/content_synchronizer.rb', line 10 def self.serial_sync(modules) updated_modules = [] modules.each do |mod| updated = mod.sync updated_modules << mod.name if updated end updated_modules end |
.sync_queue(mods_queue, pool_size, logger) ⇒ Object
Returns a Queue of the names of modules actually updated
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/r10k/content_synchronizer.rb', line 32 def self.sync_queue(mods_queue, pool_size, logger) logger.debug _("Updating modules with %{pool_size} threads") % {pool_size: pool_size} updated_modules = Queue.new thread_pool = pool_size.times.map { sync_thread(mods_queue, logger, updated_modules) } thread_exception = nil # If any threads raise an exception the deployment is considered a failure. # In that event clear the queue, wait for other threads to finish their # current work, then re-raise the first exception caught. begin thread_pool.each(&:join) # Return the list of all modules that were actually updated updated_modules rescue => e logger.error _("Error during concurrent deploy of a module: %{message}") % {message: e.} mods_queue.clear thread_exception ||= e retry ensure raise thread_exception unless thread_exception.nil? end end |
.sync_thread(mods_queue, logger, updated_modules) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/r10k/content_synchronizer.rb', line 77 def self.sync_thread(mods_queue, logger, updated_modules) Thread.new do begin while mods = mods_queue.pop(true) do mods.each do |mod| begin updated = mod.sync updated_modules << mod.name if updated rescue Exception => e logger.error _("Module %{mod_name} failed to synchronize due to %{message}") % {mod_name: mod.name, message: e.} raise e end end end rescue ThreadError => e logger.debug _("Module thread %{id} exiting: %{message}") % {message: e., id: Thread.current.object_id} Thread.exit rescue => e Thread.main.raise(e) end end end |