Module: Amp::RevlogSupport::ChangeGroup
- Defined in:
- lib/amp/revlogs/changegroup.rb
Overview
This class handles changegroups - most specifically, packaging up a bunch of revisions into a single bundled file.
Defined Under Namespace
Classes: ChangeGroupError
Constant Summary collapse
- BUNDLE_HEADERS =
{ "" => "", "HG10UN" => "HG10UN", "HG10BZ" => "HG10", "HG10GZ" => "HG10GZ" }
- FORMAT_PRIORITIES =
BUNDLE_TYPES = ["HG10GZ", "HG10BZ", "HG10UN"]
Class Method Summary collapse
-
.chunk_header(size) ⇒ String
If we have data of size
size
, then return the encoded header for the chunk. -
.closing_chunk ⇒ String
The terminating chunk that indicates the end of a chunk sequence.
-
.compressor_by_type(header) ⇒ IO, ...
Returns a compressing stream based on the header for a changegroup bundle.
-
.each_chunk(source) {|chunk| ... } ⇒ Object
Loads each chunk, in a row, until we run out of chunks in the changegroup.
-
.get_chunk(source) ⇒ String
Loads a single chunk of data from a changegroup.
-
.unbundle(header, file_handle) ⇒ IO, #read
Returns a stream that will decompress the IO pointed to by file_handle, when #read is called upon it.
-
.write_bundle(changegroup, bundletype, fh = StringIO.new("", (ruby_19? ? "w+:ASCII-8BIT" : "w+"))) ⇒ IO, #write
Writes a set of changegroups to a bundle.
Class Method Details
.chunk_header(size) ⇒ String
If we have data of size size
, then return the encoded header for the chunk
63 64 65 |
# File 'lib/amp/revlogs/changegroup.rb', line 63 def self.chunk_header(size) [size + 4].pack("N") end |
.closing_chunk ⇒ String
The terminating chunk that indicates the end of a chunk sequence
71 72 73 |
# File 'lib/amp/revlogs/changegroup.rb', line 71 def self.closing_chunk "\000\000\000\000" # [0].pack("N") end |
.compressor_by_type(header) ⇒ IO, ...
Returns a compressing stream based on the header for a changegroup bundle. The bundle header will specify that the contents should be either uncompressed, BZip compressed, or GZip compressed. This will return a stream that responds to #<< and #flush, where #flush will return the unread, decompressed data, and #<< will input uncompressed data.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/amp/revlogs/changegroup.rb', line 86 def self.compressor_by_type(header) case header when "HG10UN", "" # new StringIO result = StringIO.new "",(ruby_19? ? "w+:ASCII-8BIT" : "w+") # have to fix the fact that StringIO doesn't conform to the other streams, # and give it a #flush method. Kind of hackish. class << result def flush ret = self.string.dup # get the current read-in string self.string.replace "" # erase our contents self.rewind # rewind the IO self.tell ret # return the string end end #return the altered StringIO result when "HG10GZ" # lazy-load Zlib require 'zlib' # Return a deflating stream (compressor) Zlib::Deflate.new when "HG10BZ", "HG10" # lazy load BZip need { '../../../ext/amp/bz2/bz2' } # Return a compressing BZip stream BZ2::Writer.new end end |
.each_chunk(source) {|chunk| ... } ⇒ Object
Loads each chunk, in a row, until we run out of chunks in the changegroup. Yields the data to the caller. This will run until we hit a terminating chunk.
51 52 53 54 55 56 |
# File 'lib/amp/revlogs/changegroup.rb', line 51 def self.each_chunk(source) begin c = self.get_chunk(source) # get a chunk yield c unless c.empty? # yield if not empty end until c.empty? # keep going if we have more! end |
.get_chunk(source) ⇒ String
Loads a single chunk of data from a changegroup. Each chunk is stored in the changegroup as:
(uint32_t) length <-- less than 4 if we should terminate
(length * char) body
Example:
00 00 00 05 h e l l o
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/amp/revlogs/changegroup.rb', line 31 def self.get_chunk(source) data = source.read 4 return "" if data.nil? || data.empty? l = data.unpack("N")[0] return "" if l <= 4 data = source.read(l - 4) if data.size < l - 4 raise ChangeGroupError.new("premature EOF when reading changegroup:" + "(got #{data.size} bytes, expected #{l-4})") end return data end |
.unbundle(header, file_handle) ⇒ IO, #read
Returns a stream that will decompress the IO pointed to by file_handle, when #read is called upon it. Note: file_handle doesn’t have to be a file!
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/amp/revlogs/changegroup.rb', line 126 def self.unbundle(header, file_handle) # uncompressed? just return the input IO! return file_handle if header == "HG10UN" # if we have no header, we're uncompressed if !header.start_with?("HG") # append the header to it. meh headerio = StringIO.new(header, (ruby_19? ? "w+:ASCII-8BIT" : "w+")) Amp::Support::MultiIO.new(headerio, file_handle) # WOW we have legacy support already elsif header == "HG10GZ" # Get a gzip reader Zlib::GzipReader.new(file_handle) elsif header == "HG10BZ" # get a BZip reader, but it has to decompress "BZ" first. Meh. headerio = StringIO.new("BZ", (ruby_19? ? "w+:ASCII-8BIT" : "w+")) input = Amp::Support::MultiIO.new(headerio, file_handle) BZ2::Reader.new(input) end end |
.write_bundle(changegroup, bundletype, fh = StringIO.new("", (ruby_19? ? "w+:ASCII-8BIT" : "w+"))) ⇒ IO, #write
Writes a set of changegroups to a bundle. If no IO is specified, a new StringIO is created, and the bundle is written to that (i.e., memory). the IO used is returned.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/amp/revlogs/changegroup.rb', line 158 def self.write_bundle(changegroup, bundletype, fh = StringIO.new("", (ruby_19? ? "w+:ASCII-8BIT" : "w+"))) # rewind the changegroup to start at the beginning changegroup.rewind # pick out our header header = BUNDLE_HEADERS[bundletype] # get a compressing stream compressor = compressor_by_type header # output the header (uncompressed) fh.write header # These 2 variables are for checking to see if #changegroup has been fully # read in or not. empty = false count = 0 # Do at least 2 changegroups (changelog + manifest), then go until we're empty while !empty || count <= 2 # Set empty to true for this particular file (each iteration of this loop # represents compressing 1 file's changesets into changegroups in the bundle) empty = true # Add 1 to the number of files we've comrpessed count += 1 # For each chunk in the changegroup (i.e. each changeset) inner_count = 0 self.each_chunk(changegroup) do |chunk| #puts "\t\twrite_bundle inner loop count #{inner_count}" inner_count += 1 empty = false # Compress the chunk header compressor << chunk_header(chunk.size) # Write the chunk header fh.write(compressor.flush) # compress the actual chunk 1 megabyte at a time step_amt = 1048576 (0..chunk.size).step(step_amt) do |pos| compressor << chunk[pos..(pos+step_amt-1)] fh.write(compressor.flush) fh.flush end end # Compress the terminating chunk - this indicates that there are no more changesets # for the current file compressor << closing_chunk # Write the terminating chunk out! fh.write compressor.flush fh.flush end # Write anything left over in that there compressor fh.write compressor.flush # Kill the compressor compressor.close # Return the IO we wrote to (in case we instantiated it) return fh end |