Class: Pocolog::FileUpgrader
- Inherits:
-
Object
- Object
- Pocolog::FileUpgrader
- Extended by:
- Logger::Hierarchy
- Includes:
- Logger::Forward, Logger::Hierarchy
- Defined in:
- lib/pocolog/file_upgrader.rb
Overview
Class that encapsulates the logic of upgrading a file or a set of file(s)
Defined Under Namespace
Classes: StreamCopy
Instance Attribute Summary collapse
-
#converter_registry ⇒ Object
readonly
Returns the value of attribute converter_registry.
-
#out_type_resolver ⇒ Object
readonly
Returns the value of attribute out_type_resolver.
Instance Method Summary collapse
-
#can_cp?(stream_copy) ⇒ Boolean
private
Check if the input file can be copied as-is.
-
#compute_stream_copy(in_path, reporter: CLI::NullReporter.new, skip_failures: false) ⇒ Array<StreamCopy>
private
Compute the stream copy operations for each stream in the logfile.
-
#cp_logfile_and_index(in_path, out_path, reporter: CLI::NullReporter.new) ⇒ Object
private
Copies the logfile and its index to the out path.
-
#initialize(out_type_resolver) ⇒ FileUpgrader
constructor
A new instance of FileUpgrader.
-
#upgrade(in_path, out_path, reporter: CLI::NullReporter.new, skip_failures: false, reflink: false) ⇒ Object
Upgrade the data in the streams of a logfile.
Constructor Details
#initialize(out_type_resolver) ⇒ FileUpgrader
Returns a new instance of FileUpgrader.
17 18 19 20 |
# File 'lib/pocolog/file_upgrader.rb', line 17 def initialize(out_type_resolver) @out_type_resolver = out_type_resolver @converter_registry = Upgrade::ConverterRegistry.new end |
Instance Attribute Details
#converter_registry ⇒ Object (readonly)
Returns the value of attribute converter_registry.
13 14 15 |
# File 'lib/pocolog/file_upgrader.rb', line 13 def converter_registry @converter_registry end |
#out_type_resolver ⇒ Object (readonly)
Returns the value of attribute out_type_resolver.
15 16 17 |
# File 'lib/pocolog/file_upgrader.rb', line 15 def out_type_resolver @out_type_resolver end |
Instance Method Details
#can_cp?(stream_copy) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Check if the input file can be copied as-is
185 186 187 |
# File 'lib/pocolog/file_upgrader.rb', line 185 def can_cp?(stream_copy) stream_copy.all? { |s| s.copy? } end |
#compute_stream_copy(in_path, reporter: CLI::NullReporter.new, skip_failures: false) ⇒ Array<StreamCopy>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the stream copy operations for each stream in the logfile
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/pocolog/file_upgrader.rb', line 152 def compute_stream_copy(in_path, reporter: CLI::NullReporter.new, skip_failures: false) in_logfile = Pocolog::Logfiles.open(in_path) in_logfile.streams.map do |in_stream| out_type = out_type_resolver.call(in_stream.type) if in_stream.empty? next(StreamCopy.new(in_stream, out_type, Upgrade::Ops::Identity.new(out_type))) end stream_ref_time = in_stream.interval_rt.first begin ops = Upgrade.compute(stream_ref_time, in_stream.type, out_type, converter_registry) rescue Upgrade::InvalidCast => e if skip_failures reporter.warn "cannot upgrade #{in_stream.name} of #{in_path}" PP.pp(e, buffer = "") buffer.split("\n").each do |line| reporter.warn line end next else raise e, "cannot upgrade #{in_stream.name} of #{in_path}: #{e.}", e.backtrace end end StreamCopy.new(in_stream, out_type, ops) end.compact ensure in_logfile.close if in_logfile end |
#cp_logfile_and_index(in_path, out_path, reporter: CLI::NullReporter.new) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Copies the logfile and its index to the out path
192 193 194 195 196 197 198 199 200 201 |
# File 'lib/pocolog/file_upgrader.rb', line 192 def cp_logfile_and_index(in_path, out_path, reporter: CLI::NullReporter.new) strategy = FileUtils.cp_cow(in_path, out_path) reporter.log "file dos not require an upgrade, copied (#{strategy})" in_idx_path = File.join(File.dirname(in_path), File.basename(in_path, '.0.log') + '.0.idx') out_idx_path = File.join(File.dirname(out_path), "#{File.basename(out_path, '.0.log')}.0.idx") if File.file?(in_idx_path) FileUtils.cp_cow(in_idx_path, out_idx_path) end end |
#upgrade(in_path, out_path, reporter: CLI::NullReporter.new, skip_failures: false, reflink: false) ⇒ Object
Upgrade the data in the streams of a logfile
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/pocolog/file_upgrader.rb', line 39 def upgrade(in_path, out_path, reporter: CLI::NullReporter.new, skip_failures: false, reflink: false) stream_copy = compute_stream_copy(in_path) if reflink && can_cp?(stream_copy) cp_logfile_and_index(in_path, out_path, reporter: reporter) return end wio = File.open(out_path, 'w+') Format::Current.write_prologue(wio) stream_ops = Array.new stream_pos = Array.new stream_types = Array.new stream_index_map = Array.new stream_copy.each do |copy_info| in_stream = copy_info.in_stream index = in_stream.index stream_ops[index] = copy_info.ops stream_pos[index] = wio.tell stream_index_map[index] = Array.new stream_types[index] = [in_stream.type.new, copy_info.out_type] if copy_info.ops.identity? reporter.log "copying #{in_stream.name}" else reporter.log "updating #{in_stream.name}" end Logfiles.write_stream_declaration( wio, index, in_stream.name, copy_info.out_type, nil, in_stream.) end report_period = 0.1 last_report = Time.now block_stream = BlockStream.open(in_path) block_stream.read_prologue interval_rt = [] while (block = block_stream.read_next_block_header) if block.kind == DATA_BLOCK index = block.stream_index ops = stream_ops[index] block_pos = wio.tell payload_header, in_marshalled_sample = block_stream.read_data_block(uncompress: false) data_header = BlockStream::DataBlockHeader.parse(payload_header) if interval_rt[index] interval_rt[index][1] = data_header.rt_time else rt_time = data_header.rt_time interval_rt[index] = [rt_time, rt_time] end if ops.identity? wio.write block.raw_data wio.write payload_header wio.write in_marshalled_sample else if data_header.compressed? in_marshalled_sample = Zlib::Inflate.inflate(in_marshalled_sample) end in_sample, out_type = stream_types[index] in_sample.from_buffer_direct(in_marshalled_sample) out_sample = out_type.new ops.call(out_sample, in_sample) out_marshalled_sample = out_sample.to_byte_array out_payload_size = out_marshalled_sample.size payload_header[-5, 4] = [out_payload_size].pack("V") block.raw_data[-4, 4] = [payload_header.size + out_payload_size].pack("V") wio.write block.raw_data wio.write payload_header wio.write out_marshalled_sample end stream_index_map[index] << block_pos << data_header.lg_time end if Time.now - last_report > report_period reporter.current = block_stream.tell last_report = Time.now end end wio.flush wio.rewind block_stream = BlockStream.new(wio) raw_stream_info = stream_pos.each_with_index.map do |stream_block_pos, stream_i| IndexBuilderStreamInfo.new( stream_block_pos, stream_index_map[stream_i] ) end stream_info = Pocolog.create_index_from_raw_info( block_stream, raw_stream_info, interval_rt: interval_rt ) File.open(Logfiles.default_index_filename(out_path), 'w') do |io| Format::Current.write_index(io, block_stream.io, stream_info) end rescue Exception FileUtils.rm_f(out_path) raise ensure wio.close if wio && !wio.closed? end |