Class: ObservationCompiler::Job
- Inherits:
-
Object
- Object
- ObservationCompiler::Job
- Defined in:
- lib/logbox/observation_compiler.rb
Constant Summary collapse
- DEFAULT_KEY_FILE =
'/etc/s3_key.yml'
Instance Method Summary collapse
- #aws_keys ⇒ Object
- #copy_processed_logs_to_working_folder(date_range) ⇒ Object
- #create_working_folders ⇒ Object
- #download_raw_logs_to_working_folder(date) ⇒ Object
- #fetch_and_merge(raw_date_range) ⇒ Object
-
#initialize(options = {}) ⇒ Job
constructor
A new instance of Job.
- #log(msg) ⇒ Object
- #merge_raw_into_processed(date) ⇒ Object
- #move_processed_back(date_range) ⇒ Object
- #processed_log_name(date) ⇒ Object
- #processed_working_path ⇒ Object
- #raw_log_already_processed?(log_file_name) ⇒ Boolean
- #raw_log_paths(date) ⇒ Object
- #raw_logs_prefix(date) ⇒ Object
- #raw_working_path ⇒ Object
- #remove_raw_logs(date) ⇒ Object
- #remove_working_path ⇒ Object
- #secret_access_key_from_keychain(key_id) ⇒ Object
-
#secret_access_key_from_keychain!(key_id) ⇒ Object
These two methods are borrowed from Awsborn.
- #sort_processed(date_range) ⇒ Object
- #unzip_processed(date_range) ⇒ Object
- #unzip_raw(date) ⇒ Object
- #zip_processed(date_range) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Job
Returns a new instance of Job.
15 16 17 18 19 20 21 |
# File 'lib/logbox/observation_compiler.rb', line 15 def initialize( = {}) @raw_logs_bucket = [:raw_logs_bucket] || "rwdata-logs" @raw_logs_prefix = [:raw_logs_prefix] || "observer-log-" @processed_logs_path = [:processed_logs_path] || "local_files" temp_dir = File.exist?("/apps/smartass") ? "/apps/smartass/tmp/" : "/tmp/" @working_path = [:working_path] ||= "#{temp_dir}observation_compiler/#{Process.pid}" end |
Instance Method Details
#aws_keys ⇒ Object
203 204 205 206 207 208 209 210 211 212 |
# File 'lib/logbox/observation_compiler.rb', line 203 def aws_keys if File.exists? DEFAULT_KEY_FILE hash = YAML.load_file(DEFAULT_KEY_FILE) [hash[:access_key_id], hash[:secret_access_key]] else access_key_id = ENV['OBSENTER_S3_KEY'] secret_access_key = secret_access_key_from_keychain!(access_key_id) [access_key_id, secret_access_key] end end |
#copy_processed_logs_to_working_folder(date_range) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/logbox/observation_compiler.rb', line 74 def copy_processed_logs_to_working_folder(date_range) date_range.each do |date| name = processed_log_name(date) + ".gz" source = File.join(@processed_logs_path, name) destination = File.join(processed_working_path, name) if File.exist? source log "Copying #{name} to working folder" FileUtils.copy(source, destination) end end end |
#create_working_folders ⇒ Object
45 46 47 48 |
# File 'lib/logbox/observation_compiler.rb', line 45 def create_working_folders FileUtils.mkdir_p(raw_working_path) FileUtils.mkdir_p(processed_working_path) end |
#download_raw_logs_to_working_folder(date) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/logbox/observation_compiler.rb', line 86 def download_raw_logs_to_working_folder(date) s3 = RightAws::S3.new(*aws_keys) bucket = s3.bucket(@raw_logs_bucket) raise "Unknown bucket: #{@raw_logs_bucket}" if bucket.nil? raw_logs = bucket.keys(:prefix => raw_logs_prefix(date)) raw_logs.each do |raw_log| log "Getting #{raw_log.name}" File.open(File.join(raw_working_path, raw_log.name), "w") do |file| s3.interface.get(@raw_logs_bucket, raw_log.name) do |chunk| file.write(chunk) end end end end |
#fetch_and_merge(raw_date_range) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/logbox/observation_compiler.rb', line 23 def fetch_and_merge(raw_date_range) # A raw log-file for a date may contain observations from the day before. processed_date_range = (raw_date_range.first-1)..(raw_date_range.last) create_working_folders copy_processed_logs_to_working_folder(processed_date_range) unzip_processed(processed_date_range) raw_date_range.each do |date| download_raw_logs_to_working_folder(date) unzip_raw(date) merge_raw_into_processed(date) remove_raw_logs(date) end ensure sort_processed(processed_date_range) zip_processed(processed_date_range) move_processed_back(processed_date_range) remove_working_path end |
#log(msg) ⇒ Object
196 197 198 199 200 |
# File 'lib/logbox/observation_compiler.rb', line 196 def log msg unless defined?(TEST_RUN) puts msg end end |
#merge_raw_into_processed(date) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/logbox/observation_compiler.rb', line 109 def merge_raw_into_processed(date) start_time = Time.now count = 0 out_files = {} raw_log_paths(date).each do |raw_log| if raw_log_already_processed?(raw_log) log "Skipping #{raw_log}" next else log "Processing #{raw_log}" end File.foreach raw_log do |line| log_line = LogLine.new(line) next unless log_line.valid? date = log_line.date name = File.join(processed_working_path, processed_log_name(date)) out_files[name] ||= File.open(name, "a") out_files[name] << log_line.normalize count += 1 end end ensure out_files.each_value { |file| file.close } log "#{count} rader på #{(Time.now - start_time).to_f}s (#{count/(Time.now - start_time).to_f} rader/s)" end |
#move_processed_back(date_range) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/logbox/observation_compiler.rb', line 184 def move_processed_back(date_range) date_range.each do |date| name = processed_log_name(date) + ".gz" source = File.join(processed_working_path, name) destination = File.join(@processed_logs_path, name) if File.exist? source log "Moving #{name} back" FileUtils.move(source, destination) end end end |
#processed_log_name(date) ⇒ Object
58 59 60 |
# File 'lib/logbox/observation_compiler.rb', line 58 def processed_log_name(date) "observer-log-#{date.strftime('%Y-%m-%d')}" end |
#processed_working_path ⇒ Object
54 55 56 |
# File 'lib/logbox/observation_compiler.rb', line 54 def processed_working_path File.join(@working_path, "p") end |
#raw_log_already_processed?(log_file_name) ⇒ Boolean
135 136 137 138 139 140 141 142 143 |
# File 'lib/logbox/observation_compiler.rb', line 135 def raw_log_already_processed?(log_file_name) # Look for the last observation to see if it is already processed. last_observation = `tail -n 1 #{log_file_name}` log_line = LogLine.new(last_observation) return false unless log_line.valid? date = log_line.date processed_file_name = File.join(processed_working_path, processed_log_name(date)) File.exist?(processed_file_name) && system("grep", "-qF", last_observation, processed_file_name.chomp) end |
#raw_log_paths(date) ⇒ Object
66 67 68 |
# File 'lib/logbox/observation_compiler.rb', line 66 def raw_log_paths(date) Dir.glob(File.join(raw_working_path, "#{raw_logs_prefix(date)}*")).sort end |
#raw_logs_prefix(date) ⇒ Object
62 63 64 |
# File 'lib/logbox/observation_compiler.rb', line 62 def raw_logs_prefix(date) @raw_logs_prefix + date.strftime('%Y-%m-%d') end |
#raw_working_path ⇒ Object
50 51 52 |
# File 'lib/logbox/observation_compiler.rb', line 50 def raw_working_path File.join(@working_path, "r") end |
#remove_raw_logs(date) ⇒ Object
145 146 147 148 149 150 |
# File 'lib/logbox/observation_compiler.rb', line 145 def remove_raw_logs(date) log "Removing raw logs for #{date}" raw_log_paths(date).each do |raw_log| FileUtils.rm(raw_log) end end |
#remove_working_path ⇒ Object
70 71 72 |
# File 'lib/logbox/observation_compiler.rb', line 70 def remove_working_path FileUtils.rm_r @working_path end |
#secret_access_key_from_keychain(key_id) ⇒ Object
221 222 223 224 225 226 227 228 229 |
# File 'lib/logbox/observation_compiler.rb', line 221 def secret_access_key_from_keychain (key_id) @credentials ||= {} unless @credentials[key_id] dump = `security -q find-generic-password -a "#{key_id}" -g 2>&1` secret_key = dump[/password: "(.*)"/, 1] @credentials[key_id] = secret_key end @credentials[key_id] end |
#secret_access_key_from_keychain!(key_id) ⇒ Object
These two methods are borrowed from Awsborn
215 216 217 218 219 |
# File 'lib/logbox/observation_compiler.rb', line 215 def secret_access_key_from_keychain! (key_id) secret = secret_access_key_from_keychain key_id raise "Could not find secret access key for #{key_id}" if secret.to_s == '' secret end |
#sort_processed(date_range) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/logbox/observation_compiler.rb', line 152 def sort_processed(date_range) date_range.each do |date| name = processed_log_name(date) Dir.chdir processed_working_path do next unless File.exist?(name) log "Sorting #{name}" ENV['LC_ALL'] = 'C' ok = system "sort -t: -k2,4 #{name} | uniq > #{name}.sorted" raise "Sort error!" unless ok File.rename("#{name}.sorted", name) end end end |
#unzip_processed(date_range) ⇒ Object
175 176 177 178 179 180 181 182 |
# File 'lib/logbox/observation_compiler.rb', line 175 def unzip_processed(date_range) log "Unzipping processed files" date_range.each do |date| name = processed_log_name(date) + ".gz" file = File.join(processed_working_path, name) system "gunzip #{file}" if File.exist? file end end |
#unzip_raw(date) ⇒ Object
102 103 104 105 106 107 |
# File 'lib/logbox/observation_compiler.rb', line 102 def unzip_raw(date) log "Unzipping raw logs for #{date}" raw_log_paths(date).each do |raw_log| system "gunzip #{raw_log}" if raw_log.end_with?(".gz") end end |
#zip_processed(date_range) ⇒ Object
166 167 168 169 170 171 172 173 |
# File 'lib/logbox/observation_compiler.rb', line 166 def zip_processed(date_range) log "Zipping processed files" date_range.each do |date| name = processed_log_name(date) file = File.join(processed_working_path, name) system "gzip #{file}" if File.exist? file end end |