Module: Docker::Registry::Sync::CMD

Includes:
Docker::Registry::Sync
Defined in:
lib/docker/registry/sync/s3.rb,
lib/docker/registry/sync/cmd.rb,
lib/docker/registry/sync/sqs.rb

Constant Summary

Constants included from Docker::Registry::Sync

VERSION

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configObject

Returns the value of attribute config.



14
15
16
# File 'lib/docker/registry/sync/cmd.rb', line 14

def config
  @config
end

.producer_finishedObject

Returns the value of attribute producer_finished.



14
15
16
# File 'lib/docker/registry/sync/cmd.rb', line 14

def producer_finished
  @producer_finished
end

.status_queueObject

Returns the value of attribute status_queue.



14
15
16
# File 'lib/docker/registry/sync/cmd.rb', line 14

def status_queue
  @status_queue
end

.threadsObject

Returns the value of attribute threads.



14
15
16
# File 'lib/docker/registry/sync/cmd.rb', line 14

def threads
  @threads
end

.work_queueObject

Returns the value of attribute work_queue.



14
15
16
# File 'lib/docker/registry/sync/cmd.rb', line 14

def work_queue
  @work_queue
end

Class Method Details

.configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/docker/registry/sync/cmd.rb', line 16

def configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug)
  unless source_bucket.nil?
    source_region, source_bucket = source_bucket.split(':')
  else
    source_region, source_bucket = nil, nil
  end

  unless target_buckets.nil?
    target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') }
  else
    target_buckets = nil
  end

  unless sqs_queue.nil?
    sqs_region, sqs_uri = sqs_queue.split(':')
  else
    sqs_region, sqs_uri = nil, nil
  end

  @synced_images = RingBuffer.new 10000

  Docker::Registry::Sync.configure do |config|
    config.source_bucket = source_bucket
    config.source_region = source_region
    config.target_buckets = target_buckets
    config.source_sse = source_uses_sse
    config.sse = use_sse
    config.sqs_region = sqs_region
    config.pool_size = pool
    config.sqs_url = "https://#{sqs_uri}"
    config.log_level = log_level
  end
  @config = Docker::Registry::Sync.config
end

.configure_signal_handlersObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/docker/registry/sync/cmd.rb', line 51

def configure_signal_handlers
  @terminated = false

  Signal.trap('INT') do
    @config.logger.error 'Received INT signal...'
    @threads.synchronize do
      @producer_finished = true
      @terminated = true
      @work_queue.clear
    end
  end
  Signal.trap('TERM') do
    @config.logger.error 'Received TERM signal...'
    @threads.synchronize do
      @producer_finished = true
      @terminated = true
      @work_queue.clear
    end
  end
end

.configure_workersObject



72
73
74
75
76
77
78
79
80
81
# File 'lib/docker/registry/sync/cmd.rb', line 72

def configure_workers
  @threads = Array.new(@config.pool_size)
  @work_queue = Queue.new
  @status_queue = Queue.new

  @threads.extend(MonitorMixin)
  @threads_available = @threads.new_cond

  @producer_finished = false
end

.finalize_message(receipt_handle) ⇒ Object



39
40
41
42
43
# File 'lib/docker/registry/sync/sqs.rb', line 39

def finalize_message(receipt_handle)
  sqs = Aws::SQS::Client.new(region: @config.sqs_region)
  resp = sqs.delete_message(queue_url: @config.sqs_url,
                            receipt_handle: receipt_handle)
end

.finalize_workersObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/docker/registry/sync/cmd.rb', line 89

def finalize_workers
  @threads.synchronize do
    @producer_finished = true
  end
  @consumer_thread.join
  @consumer_thread = nil
  @threads.each { |t| t.join unless t.nil? }
  @config.logger.info "Processing job results..."

  success = true
  loop do
    begin
      # One job filure is a run failure
      success = @status_queue.pop(true) && success
    rescue ThreadError
      @config.logger.info "Finished processing job results..."
      break
    end
  end
  success && !@terminated
end

.image_exists?(image, bucket, region) ⇒ Boolean

Returns:

  • (Boolean)


10
11
12
13
14
15
16
17
18
19
# File 'lib/docker/registry/sync/s3.rb', line 10

def image_exists?(image, bucket, region)
  s3 = Aws::S3::Client.new(region: region)
  begin
    s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images")
  rescue Aws::S3::Errors::NotFound
    false
  else
    true
  end
end

.queue_sync(image, tag) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/docker/registry/sync/cmd.rb', line 127

def queue_sync(image, tag)
  msgs = @config.target_buckets.map do |region, bucket, sse|
    JSON.dump(retries: 0,
              image: image,
              tag: tag,
              source: {
                bucket: @config.source_bucket,
                region: @config.source_region
              },
              target: {
                bucket: bucket,
                region: region,
                sse: !sse.nil?
              })
  end
  send_message_batch(msgs) ? 0 : 1
end

.run_syncObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/docker/registry/sync/cmd.rb', line 145

def run_sync
  ec = 1
  configure_signal_handlers
  begin
    @config.logger.info 'Polling queue for images to sync...'
    sqs = Aws::SQS::Client.new(region: @config.sqs_region)
    resp = sqs.receive_message(
      queue_url: @config.sqs_url,
      max_number_of_messages: 1,
      visibility_timeout: 900, # Give ourselves 15min to sync the image
      wait_time_seconds: 10, # Wait a maximum of 10s for a new message
    )
    @config.logger.info "SQS returned #{resp.messages.length} new images to sync..."
    if resp.messages.length == 1
      message = resp.messages[0]
      data = JSON.load(message.body)
      @config.logger.info "Image sync data:  #{data}"

      if image_exists?(data['image'], data['target']['bucket'], data['target']['region'])
        configure_workers
        start_workers
        @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
        success = sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region'])
        success = finalize_workers && success

        if success
          @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
          finalize_message(message.receipt_handle)
        else
          @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
        end
      else
        configure_workers
        start_workers
        success = sync_repo(data['image'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region'])
        success = finalize_workers && success
        @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
        if success
          @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
          finalize_message(message.receipt_handle)
        else
          @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
        end
      end
    end
    ec = 0
    sleep @config.empty_queue_sleep_time unless @terminated
  rescue StandardError => e
    @config.logger.error "An unknown error occurred while monitoring queue: #{e}"
    @config.logger.error e.traceback
    @config.logger.error 'Exiting...'
    @terminated = true
    ec = 1
    finalize_workers # make sure there are no hangers-on
  end until @terminated
  ec
end

.send_message_batch(messages, retries = 5) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/docker/registry/sync/sqs.rb', line 11

def send_message_batch(messages, retries = 5)
  if retries <= 0
    success = false
    messages.each do |msg|
      @config.logger.Error "Failed to Enqueue message: #{msg}"
    end
  else
    entries = messages.map do |msg|
      @config.logger.info "Enqueuing message: #{msg}"
      { id: Digest::MD5.hexdigest(msg), message_body: msg }
    end
    sqs = Aws::SQS::Client.new(region: @config.sqs_region)
    resp = sqs.send_message_batch(queue_url: @config.sqs_url,
                                  entries: entries)
    if resp.failed.length > 0
      rerun = resp.failed.map do |failed|
        @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}"
        messages.select { |m| Digest::MD5.hexdigest(m) == failed.id }[0]
      end
      sleep 1 # short sleep before trying again...
      success = send_message_batch(rerun, retries - 1)
    else
      success = true
    end
  end
  success
end

.start_workersObject



83
84
85
86
87
# File 'lib/docker/registry/sync/cmd.rb', line 83

def start_workers
  @consumer_thread = Thread.new do
    sync_key_consumer
  end
end

.sync(image, tag) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/docker/registry/sync/cmd.rb', line 111

def sync(image, tag)
  configure_signal_handlers
  configure_workers
  start_workers
  success = false
  @config.target_buckets.each do |region, bucket, sse|
    if image_exists?(image, bucket, region)
      success = sync_tag(image, tag, bucket, region, !sse.nil?)
    else
      success = sync_repo(image, bucket, region, !sse.nil?)
    end
  end
  success = finalize_workers && success
  success ? 0 : 1
end

.sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/docker/registry/sync/s3.rb', line 69

def sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket
  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  ancestry_resp = s3_source.get_object(bucket: source_bucket, key: "registry/images/#{image_id}/ancestry")
  # Ancestry includes self
  JSON.load(ancestry_resp.body.read).each do |image|
    unless @synced_images.include? "#{image}:#{region}:#{bucket}"
      sync_prefix(s3_source, s3_target, bucket, sse, "registry/images/#{image}/", source_bucket)
      @synced_images << "#{image}:#{region}:#{bucket}"
    end
  end
end

.sync_key_consumerObject



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/docker/registry/sync/s3.rb', line 122

def sync_key_consumer
  @config.logger.info "Starting sync consumer..."
  loop do
    break if @producer_finished && @work_queue.length == 0
    t_index = nil

    begin
      sleep 0.1
      busy = @threads.select { |t| t.nil? || t.status == false  || t['finished'].nil? == false }.length == 0
    end until !busy
    t_index = @threads.rindex { |t| t.nil? || t.status == false || t['finished'].nil? == false }

    begin
      opts = @work_queue.pop(true)
    rescue ThreadError
      @config.logger.info "No work found on the queue, sleeping..."
      sleep 1
    else
      if opts[:key]
        @threads[t_index].join unless @threads[t_index].nil?
        @threads[t_index] = Thread.new do
          @config.logger.info "Worker syncing key: #{opts[:key]}"
          target_client = Aws::S3::Client.new(region: opts[:region])
          opts.delete :region
          success = false
          begin
            target_client.copy_object(opts)
            success = true
            @config.logger.info "Worker finished syncing key: #{opts[:key]}"
          rescue StandardError => e
            @config.logger.error "An unknown error occoured while copying object in s3: #{e}"
            @config.logger.error e.backtrace
          ensure
            Thread.current['finished'] = true
            @status_queue << success
          end
        end
      else
        @config.logger.info "Queued work empty: #{opts}"
      end
    end
  end
end

.sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/docker/registry/sync/s3.rb', line 103

def sync_keys(target_client, target_bucket, target_sse, keys, source_bucket)
  keys.each do |key|
    @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}"
    opts = {acl: 'bucket-owner-full-control',
            region: target_client.config[:region],
            bucket: target_bucket,
            key: key,
            copy_source: "#{source_bucket}/#{key}"}
    if @config.sse || target_sse
      opts[:server_side_encryption] = 'AES256'
    end
    if @config.source_sse
      opts[:copy_source_sse_customer_algorithm] = 'AES256'
    end
    @work_queue << opts
    sleep 0.1
  end
end

.sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/docker/registry/sync/s3.rb', line 86

def sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket)
  keys = []
  img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix)

  loop do
    img_resp.contents.each do |item|
      keys << item.key
    end
    if img_resp.last_page?
      break
    else
      img_resp.next_page
    end
  end
  sync_keys(target_client, target_bucket, target_sse, keys, source_bucket)
end

.sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/docker/registry/sync/s3.rb', line 46

def sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket
  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  begin
    rep_prefix = "registry/repositories/#{repo}/"
    sync_prefix(s3_source, s3_target, bucket, sse, rep_prefix, source_bucket)

    img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{repo}/_index_images")
    JSON.load(img_index_resp.body.read).each do |image|
      sync_image(image['id'], bucket, region, sse, source_bucket, source_region)
    end
  rescue StandardError => e
    @config.logger.error "An unexpected error occoured while syncing repo #{repo}: #{e}"
    @config.logger.error e.backtrace
    false
  else
    true
  end
end

.sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/docker/registry/sync/s3.rb', line 22

def sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket

  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  begin
    keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key|
      "registry/repositories/#{image}/#{key}"
    end
    sync_keys(s3_target, bucket, sse, keys, source_bucket)

    img_id = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}").body.read
    sync_image(img_id, bucket, region, sse, source_bucket, source_region)
  rescue StandardError => e
    @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}"
    @config.logger.error e.backtrace
    false
  else
    true
  end
end