Class: S3_Multi_Upload::Upload

Inherits:
Object
  • Object
show all
Defined in:
lib/s3_multi_upload.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Upload

Returns a new instance of Upload.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/s3_multi_upload.rb', line 10

def initialize options

  AWS.config :access_key_id     => options[:access_key_id],
             :secret_access_key => options[:secret_access_key]

  @options = options
  @file    = Pathname.new options[:file]
  @queue   = Queue.new
  @mutex   = Mutex.new

  @s3      = AWS::S3.new
  @bucket  = @s3.buckets.create options[:bucket]
  @object  = @bucket.objects[options[:key] || @file.basename]

  enqueue
end

Instance Attribute Details

#bucketObject

Returns the value of attribute bucket.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def bucket
  @bucket
end

#fileObject

Returns the value of attribute file.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def file
  @file
end

#mutexObject

Returns the value of attribute mutex.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def mutex
  @mutex
end

#objectObject

Returns the value of attribute object.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def object
  @object
end

#optionsObject

Returns the value of attribute options.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def options
  @options
end

#progressObject

Returns the value of attribute progress.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def progress
  @progress
end

#queueObject

Returns the value of attribute queue.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def queue
  @queue
end

#s3Object

Returns the value of attribute s3.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def s3
  @s3
end

Instance Method Details

#chunk_sizeObject



40
41
42
# File 'lib/s3_multi_upload.rb', line 40

def chunk_size
  normalize *options[:chunk_size].first
end

#enqueueObject



44
45
46
47
48
# File 'lib/s3_multi_upload.rb', line 44

def enqueue
  (file.size.to_f / chunk_size).ceil.times do |index|
    queue << [chunk_size * index, index + 1]
  end
end

#normalize(value, unit = nil) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/s3_multi_upload.rb', line 27

def normalize value, unit = nil
  case unit.downcase.to_sym
  when nil, :b, :byte, :bytes
    value.to_f
  when :k, :kb, :kilobyte, :kilobytes
    value.to_f * 1024
  when :m, :mb, :megabyte, :megabytes
    value.to_f * 1024 ** 2
  when :g, :gb, :gigabyte, :gigabytes
    value.to_f * 1024 ** 3
  end
end

#processObject



88
89
90
91
92
93
94
# File 'lib/s3_multi_upload.rb', line 88

def process
  value, unit = *options[:chunk_size].first
  puts "uploading #{file} to s3://#{options[:bucket]}/#{object.key} using #{options[:threads]} threads in chunks of #{value} #{unit}"
  progress if options[:progress_bar]
  abort 'upload failed' unless upload
  progress.finish if options[:progress_bar]
end

#uploadObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/s3_multi_upload.rb', line 50

def upload
  object.multipart_upload do |upload|
    options[:threads].times.collect do
      Thread.new do
        until queue.empty?
          offset, index = queue.deq :asynchronously rescue nil

          unless offset.nil?
            upload_parameters = {
              :data        => file.read(chunk_size, offset),
              :part_number => index,
            }

            if options[:checksum]
              digest         = Digest::MD5.digest(upload_parameters[:data])
              encoded_digest = Base64.encode64(digest).strip

              upload_parameters[:content_md5] = encoded_digest
            end

            upload.add_part upload_parameters

            if options[:progress_bar]
              mutex.synchronize do
                progress.inc
              end
            end
          end
        end
      end
    end.each(&:join)
  end
end