Class: Aliyun::OSS::Multipart::Download

Inherits:
Transaction show all
Includes:
Common::Logging
Defined in:
lib/aliyun/oss/download.rb

Overview

A multipart download transaction

Constant Summary collapse

PART_SIZE =
10 * 1024 * 1024
READ_SIZE =
16 * 1024
NUM_THREAD =
10

Constants included from Common::Logging

Common::Logging::MAX_NUM_LOG, Common::Logging::ROTATE_SIZE

Instance Method Summary collapse

Methods included from Common::Logging

#logger, set_log_file, set_log_level

Methods inherited from Common::Struct::Base

#to_s

Methods included from Common::Struct::Base::AttrHelper

#attrs

Constructor Details

#initialize(protocol, opts) ⇒ Download

Returns a new instance of Download.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/aliyun/oss/download.rb', line 17

def initialize(protocol, opts)
  args = opts.dup
  @protocol = protocol
  @progress = args.delete(:progress)
  @file = args.delete(:file)
  @cpt_file = args.delete(:cpt_file)
  super(args)

  @object_meta = {}
  @num_threads = options[:threads] || NUM_THREAD
  @all_mutex = Mutex.new
  @parts = []
  @todo_mutex = Mutex.new
  @todo_parts = []
end

Instance Method Details

#checkpointObject

Checkpoint structures:

Examples:

states = {
  :id => 'download_id',
  :file => 'file',
  :object_meta => {
    :etag => 'xxx',
    :size => 1024
  },
  :parts => [
    {:number => 1, :range => [0, 100], :md5 => 'xxx', :done => false},
    {:number => 2, :range => [100, 200], :md5 => 'yyy', :done => true}
  ],
  :md5 => 'states_md5'
}


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/aliyun/oss/download.rb', line 84

def checkpoint
  logger.debug("Begin make checkpoint, disable_cpt: "\
               "#{options[:disable_cpt] == true}")

  ensure_object_not_changed

  parts = sync_get_all_parts
  states = {
    :id => id,
    :file => @file,
    :object_meta => @object_meta,
    :parts => parts
  }

  # report progress
  if @progress
    done = parts.count { |p| p[:done] }
    @progress.call(done.to_f / parts.size) if done > 0
  end

  write_checkpoint(states, @cpt_file) unless options[:disable_cpt]

  logger.debug("Done make checkpoint, states: #{states}")
end

#runObject

Run the download transaction, which includes 3 stages:

  • 1a. initiate(new downlaod) and divide parts

  • 1b. rebuild states(resumed download)

    1. download each unfinished part

    1. combine the downloaded parts into the final file



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/aliyun/oss/download.rb', line 38

def run
  logger.info("Begin download, file: #{@file}, "\
              "checkpoint file: #{@cpt_file}, "\
              "threads: #{@num_threads}")

  # Rebuild transaction states from checkpoint file
  # Or initiate new transaction states
  rebuild

  # Divide the target object into parts to download by ranges
  divide_parts if @parts.empty?

  # Download each part(object range)
  @todo_parts = @parts.reject { |p| p[:done] }

  (1..@num_threads).map {
    Thread.new {
      loop {
        p = sync_get_todo_part
        break unless p
        download_part(p)
      }
    }
  }.map(&:join)

  # Combine the parts into the final file
  commit

  logger.info("Done download, file: #{@file}")
end