Module: Docker::Registry::Sync::CMD
- Includes:
- Docker::Registry::Sync
- Defined in:
- lib/docker/registry/sync/s3.rb,
lib/docker/registry/sync/cmd.rb,
lib/docker/registry/sync/sqs.rb
Constant Summary
Constants included from Docker::Registry::Sync
Class Attribute Summary collapse
-
.config ⇒ Object
Returns the value of attribute config.
-
.producer_finished ⇒ Object
Returns the value of attribute producer_finished.
-
.status_queue ⇒ Object
Returns the value of attribute status_queue.
-
.threads ⇒ Object
Returns the value of attribute threads.
-
.work_queue ⇒ Object
Returns the value of attribute work_queue.
Class Method Summary collapse
- .configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug) ⇒ Object
- .configure_signal_handlers ⇒ Object
- .configure_workers ⇒ Object
- .finalize_message(receipt_handle) ⇒ Object
- .finalize_workers ⇒ Object
- .image_exists?(image, bucket, region) ⇒ Boolean
- .queue_sync(image, tag) ⇒ Object
- .run_sync ⇒ Object
- .send_message_batch(messages, retries = 5) ⇒ Object
- .start_workers ⇒ Object
- .sync(image, tag) ⇒ Object
- .sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
- .sync_key_consumer ⇒ Object
- .sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) ⇒ Object
- .sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) ⇒ Object
- .sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
- .sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
Class Attribute Details
.config ⇒ Object
Returns the value of attribute config.
14 15 16 |
# File 'lib/docker/registry/sync/cmd.rb', line 14 def config @config end |
.producer_finished ⇒ Object
Returns the value of attribute producer_finished.
14 15 16 |
# File 'lib/docker/registry/sync/cmd.rb', line 14 def producer_finished @producer_finished end |
.status_queue ⇒ Object
Returns the value of attribute status_queue.
14 15 16 |
# File 'lib/docker/registry/sync/cmd.rb', line 14 def status_queue @status_queue end |
.threads ⇒ Object
Returns the value of attribute threads.
14 15 16 |
# File 'lib/docker/registry/sync/cmd.rb', line 14 def threads @threads end |
.work_queue ⇒ Object
Returns the value of attribute work_queue.
14 15 16 |
# File 'lib/docker/registry/sync/cmd.rb', line 14 def work_queue @work_queue end |
Class Method Details
.configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/docker/registry/sync/cmd.rb', line 16 def configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug) unless source_bucket.nil? source_region, source_bucket = source_bucket.split(':') else source_region, source_bucket = nil, nil end unless target_buckets.nil? target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') } else target_buckets = nil end unless sqs_queue.nil? sqs_region, sqs_uri = sqs_queue.split(':') else sqs_region, sqs_uri = nil, nil end @synced_images = RingBuffer.new 10000 Docker::Registry::Sync.configure do |config| config.source_bucket = source_bucket config.source_region = source_region config.target_buckets = target_buckets config.source_sse = source_uses_sse config.sse = use_sse config.sqs_region = sqs_region config.pool_size = pool config.sqs_url = "https://#{sqs_uri}" config.log_level = log_level end @config = Docker::Registry::Sync.config end |
.configure_signal_handlers ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/docker/registry/sync/cmd.rb', line 51 def configure_signal_handlers @terminated = false Signal.trap('INT') do @config.logger.error 'Received INT signal...' @threads.synchronize do @producer_finished = true @terminated = true @work_queue.clear end end Signal.trap('TERM') do @config.logger.error 'Received TERM signal...' @threads.synchronize do @producer_finished = true @terminated = true @work_queue.clear end end end |
.configure_workers ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/docker/registry/sync/cmd.rb', line 72 def configure_workers @threads = Array.new(@config.pool_size) @work_queue = Queue.new @status_queue = Queue.new @threads.extend(MonitorMixin) @threads_available = @threads.new_cond @producer_finished = false end |
.finalize_message(receipt_handle) ⇒ Object
39 40 41 42 43 |
# File 'lib/docker/registry/sync/sqs.rb', line 39 def (receipt_handle) sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.(queue_url: @config.sqs_url, receipt_handle: receipt_handle) end |
.finalize_workers ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/docker/registry/sync/cmd.rb', line 89 def finalize_workers @threads.synchronize do @producer_finished = true end @consumer_thread.join @consumer_thread = nil @threads.each { |t| t.join unless t.nil? } @config.logger.info "Processing job results..." success = true loop do begin # One job filure is a run failure success = @status_queue.pop(true) && success rescue ThreadError @config.logger.info "Finished processing job results..." break end end success && !@terminated end |
.image_exists?(image, bucket, region) ⇒ Boolean
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/docker/registry/sync/s3.rb', line 10 def image_exists?(image, bucket, region) s3 = Aws::S3::Client.new(region: region) begin s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images") rescue Aws::S3::Errors::NotFound false else true end end |
.queue_sync(image, tag) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/docker/registry/sync/cmd.rb', line 127 def queue_sync(image, tag) msgs = @config.target_buckets.map do |region, bucket, sse| JSON.dump(retries: 0, image: image, tag: tag, source: { bucket: @config.source_bucket, region: @config.source_region }, target: { bucket: bucket, region: region, sse: !sse.nil? }) end (msgs) ? 0 : 1 end |
.run_sync ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/docker/registry/sync/cmd.rb', line 145 def run_sync ec = 1 configure_signal_handlers begin @config.logger.info 'Polling queue for images to sync...' sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.( queue_url: @config.sqs_url, max_number_of_messages: 1, visibility_timeout: 900, # Give ourselves 15min to sync the image wait_time_seconds: 10, # Wait a maximum of 10s for a new message ) @config.logger.info "SQS returned #{resp..length} new images to sync..." if resp..length == 1 = resp.[0] data = JSON.load(.body) @config.logger.info "Image sync data: #{data}" if image_exists?(data['image'], data['target']['bucket'], data['target']['region']) configure_workers start_workers @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") success = sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region']) success = finalize_workers && success if success @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") (.receipt_handle) else @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") end else configure_workers start_workers success = sync_repo(data['image'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region']) success = finalize_workers && success @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") if success @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") (.receipt_handle) else @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") end end end ec = 0 sleep @config.empty_queue_sleep_time unless @terminated rescue StandardError => e @config.logger.error "An unknown error occurred while monitoring queue: #{e}" @config.logger.error e.traceback @config.logger.error 'Exiting...' @terminated = true ec = 1 finalize_workers # make sure there are no hangers-on end until @terminated ec end |
.send_message_batch(messages, retries = 5) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/docker/registry/sync/sqs.rb', line 11 def (, retries = 5) if retries <= 0 success = false .each do |msg| @config.logger.Error "Failed to Enqueue message: #{msg}" end else entries = .map do |msg| @config.logger.info "Enqueuing message: #{msg}" { id: Digest::MD5.hexdigest(msg), message_body: msg } end sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.(queue_url: @config.sqs_url, entries: entries) if resp.failed.length > 0 rerun = resp.failed.map do |failed| @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}" .select { |m| Digest::MD5.hexdigest(m) == failed.id }[0] end sleep 1 # short sleep before trying again... success = (rerun, retries - 1) else success = true end end success end |
.start_workers ⇒ Object
83 84 85 86 87 |
# File 'lib/docker/registry/sync/cmd.rb', line 83 def start_workers @consumer_thread = Thread.new do sync_key_consumer end end |
.sync(image, tag) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/docker/registry/sync/cmd.rb', line 111 def sync(image, tag) configure_signal_handlers configure_workers start_workers success = false @config.target_buckets.each do |region, bucket, sse| if image_exists?(image, bucket, region) success = sync_tag(image, tag, bucket, region, !sse.nil?) else success = sync_repo(image, bucket, region, !sse.nil?) end end success = finalize_workers && success success ? 0 : 1 end |
.sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/docker/registry/sync/s3.rb', line 69 def sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) ancestry_resp = s3_source.get_object(bucket: source_bucket, key: "registry/images/#{image_id}/ancestry") # Ancestry includes self JSON.load(ancestry_resp.body.read).each do |image| unless @synced_images.include? "#{image}:#{region}:#{bucket}" sync_prefix(s3_source, s3_target, bucket, sse, "registry/images/#{image}/", source_bucket) @synced_images << "#{image}:#{region}:#{bucket}" end end end |
.sync_key_consumer ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/docker/registry/sync/s3.rb', line 122 def sync_key_consumer @config.logger.info "Starting sync consumer..." loop do break if @producer_finished && @work_queue.length == 0 t_index = nil begin sleep 0.1 busy = @threads.select { |t| t.nil? || t.status == false || t['finished'].nil? == false }.length == 0 end until !busy t_index = @threads.rindex { |t| t.nil? || t.status == false || t['finished'].nil? == false } begin opts = @work_queue.pop(true) rescue ThreadError @config.logger.info "No work found on the queue, sleeping..." sleep 1 else if opts[:key] @threads[t_index].join unless @threads[t_index].nil? @threads[t_index] = Thread.new do @config.logger.info "Worker syncing key: #{opts[:key]}" target_client = Aws::S3::Client.new(region: opts[:region]) opts.delete :region success = false begin target_client.copy_object(opts) success = true @config.logger.info "Worker finished syncing key: #{opts[:key]}" rescue StandardError => e @config.logger.error "An unknown error occoured while copying object in s3: #{e}" @config.logger.error e.backtrace ensure Thread.current['finished'] = true @status_queue << success end end else @config.logger.info "Queued work empty: #{opts}" end end end end |
.sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/docker/registry/sync/s3.rb', line 103 def sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) keys.each do |key| @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}" opts = {acl: 'bucket-owner-full-control', region: target_client.config[:region], bucket: target_bucket, key: key, copy_source: "#{source_bucket}/#{key}"} if @config.sse || target_sse opts[:server_side_encryption] = 'AES256' end if @config.source_sse opts[:copy_source_sse_customer_algorithm] = 'AES256' end @work_queue << opts sleep 0.1 end end |
.sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/docker/registry/sync/s3.rb', line 86 def sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) keys = [] img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix) loop do img_resp.contents.each do |item| keys << item.key end if img_resp.last_page? break else img_resp.next_page end end sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) end |
.sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/docker/registry/sync/s3.rb', line 46 def sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin rep_prefix = "registry/repositories/#{repo}/" sync_prefix(s3_source, s3_target, bucket, sse, rep_prefix, source_bucket) img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{repo}/_index_images") JSON.load(img_index_resp.body.read).each do |image| sync_image(image['id'], bucket, region, sse, source_bucket, source_region) end rescue StandardError => e @config.logger.error "An unexpected error occoured while syncing repo #{repo}: #{e}" @config.logger.error e.backtrace false else true end end |
.sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/docker/registry/sync/s3.rb', line 22 def sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key| "registry/repositories/#{image}/#{key}" end sync_keys(s3_target, bucket, sse, keys, source_bucket) img_id = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}").body.read sync_image(img_id, bucket, region, sse, source_bucket, source_region) rescue StandardError => e @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}" @config.logger.error e.backtrace false else true end end |