Class: ObservationCompiler::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/logbox/observation_compiler.rb

Constant Summary collapse

DEFAULT_KEY_FILE =
'/etc/s3_key.yml'

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Job

Returns a new instance of Job.



15
16
17
18
19
20
21
# File 'lib/logbox/observation_compiler.rb', line 15

def initialize(options = {})
  @raw_logs_bucket     = options[:raw_logs_bucket] || "rwdata-logs"
  @raw_logs_prefix     = options[:raw_logs_prefix] || "observer-log-"
  @processed_logs_path = options[:processed_logs_path] || "local_files"
  temp_dir = File.exist?("/apps/smartass") ? "/apps/smartass/tmp/" : "/tmp/"
  @working_path        = options[:working_path] ||= "#{temp_dir}observation_compiler/#{Process.pid}"
end

Instance Method Details

#aws_keysObject



203
204
205
206
207
208
209
210
211
212
# File 'lib/logbox/observation_compiler.rb', line 203

def aws_keys
  if File.exists? DEFAULT_KEY_FILE
    hash = YAML.load_file(DEFAULT_KEY_FILE)
    [hash[:access_key_id], hash[:secret_access_key]]
  else
    access_key_id = ENV['OBSENTER_S3_KEY']
    secret_access_key = secret_access_key_from_keychain!(access_key_id)
    [access_key_id, secret_access_key]
  end
end

#copy_processed_logs_to_working_folder(date_range) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/logbox/observation_compiler.rb', line 74

def copy_processed_logs_to_working_folder(date_range)
  date_range.each do |date|
    name = processed_log_name(date) + ".gz"
    source = File.join(@processed_logs_path, name)
    destination = File.join(processed_working_path, name)
    if File.exist? source
      log "Copying #{name} to working folder"
      FileUtils.copy(source, destination)
    end
  end
end

#create_working_foldersObject



45
46
47
48
# File 'lib/logbox/observation_compiler.rb', line 45

def create_working_folders
  FileUtils.mkdir_p(raw_working_path)
  FileUtils.mkdir_p(processed_working_path)
end

#download_raw_logs_to_working_folder(date) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/logbox/observation_compiler.rb', line 86

def download_raw_logs_to_working_folder(date)
  s3 = RightAws::S3.new(*aws_keys)
  bucket = s3.bucket(@raw_logs_bucket)
  raise "Unknown bucket: #{@raw_logs_bucket}" if bucket.nil?

  raw_logs = bucket.keys(:prefix => raw_logs_prefix(date))
  raw_logs.each do |raw_log|
    log "Getting #{raw_log.name}"
    File.open(File.join(raw_working_path, raw_log.name), "w") do |file|
      s3.interface.get(@raw_logs_bucket, raw_log.name) do |chunk|
        file.write(chunk)
      end
    end
  end
end

#fetch_and_merge(raw_date_range) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/logbox/observation_compiler.rb', line 23

def fetch_and_merge(raw_date_range)
  # A raw log-file for a date may contain observations from the day before. 
  processed_date_range = (raw_date_range.first-1)..(raw_date_range.last)

  create_working_folders
  copy_processed_logs_to_working_folder(processed_date_range)
  unzip_processed(processed_date_range)

  raw_date_range.each do |date|
    download_raw_logs_to_working_folder(date)
    unzip_raw(date)
    merge_raw_into_processed(date)
    remove_raw_logs(date)
  end

ensure
  sort_processed(processed_date_range)
  zip_processed(processed_date_range)
  move_processed_back(processed_date_range)
  remove_working_path
end

#log(msg) ⇒ Object



196
197
198
199
200
# File 'lib/logbox/observation_compiler.rb', line 196

def log msg
  unless defined?(TEST_RUN)
    puts msg
  end
end

#merge_raw_into_processed(date) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/logbox/observation_compiler.rb', line 109

def merge_raw_into_processed(date)
  start_time = Time.now
  count = 0
  out_files = {}
  raw_log_paths(date).each do |raw_log|
    if raw_log_already_processed?(raw_log)
      log "Skipping #{raw_log}"
      next
    else
      log "Processing #{raw_log}"
    end
    File.foreach raw_log do |line|
      log_line = LogLine.new(line)
      next unless log_line.valid?
      date = log_line.date
      name = File.join(processed_working_path, processed_log_name(date))
      out_files[name] ||= File.open(name, "a")
      out_files[name] << log_line.normalize
      count += 1
    end
  end
ensure
  out_files.each_value { |file| file.close }
  log "#{count} rader på #{(Time.now - start_time).to_f}s (#{count/(Time.now - start_time).to_f} rader/s)"
end

#move_processed_back(date_range) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/logbox/observation_compiler.rb', line 184

def move_processed_back(date_range)
  date_range.each do |date|
    name = processed_log_name(date) + ".gz"
    source = File.join(processed_working_path, name)
    destination = File.join(@processed_logs_path, name)
    if File.exist? source
      log "Moving #{name} back"
      FileUtils.move(source, destination)
    end
  end
end

#processed_log_name(date) ⇒ Object



58
59
60
# File 'lib/logbox/observation_compiler.rb', line 58

def processed_log_name(date)
  "observer-log-#{date.strftime('%Y-%m-%d')}"
end

#processed_working_pathObject



54
55
56
# File 'lib/logbox/observation_compiler.rb', line 54

def processed_working_path
  File.join(@working_path, "p")
end

#raw_log_already_processed?(log_file_name) ⇒ Boolean

Returns:

  • (Boolean)


135
136
137
138
139
140
141
142
143
# File 'lib/logbox/observation_compiler.rb', line 135

def raw_log_already_processed?(log_file_name)
  # Look for the last observation to see if it is already processed.
  last_observation = `tail -n 1 #{log_file_name}`
  log_line = LogLine.new(last_observation)
  return false unless log_line.valid?
  date = log_line.date
  processed_file_name = File.join(processed_working_path, processed_log_name(date))
  File.exist?(processed_file_name) && system("grep", "-qF", last_observation, processed_file_name.chomp)
end

#raw_log_paths(date) ⇒ Object



66
67
68
# File 'lib/logbox/observation_compiler.rb', line 66

def raw_log_paths(date)
  Dir.glob(File.join(raw_working_path, "#{raw_logs_prefix(date)}*")).sort
end

#raw_logs_prefix(date) ⇒ Object



62
63
64
# File 'lib/logbox/observation_compiler.rb', line 62

def raw_logs_prefix(date)
  @raw_logs_prefix + date.strftime('%Y-%m-%d')
end

#raw_working_pathObject



50
51
52
# File 'lib/logbox/observation_compiler.rb', line 50

def raw_working_path
  File.join(@working_path, "r")
end

#remove_raw_logs(date) ⇒ Object



145
146
147
148
149
150
# File 'lib/logbox/observation_compiler.rb', line 145

def remove_raw_logs(date)
  log "Removing raw logs for #{date}"
  raw_log_paths(date).each do |raw_log|
    FileUtils.rm(raw_log)
  end
end

#remove_working_pathObject



70
71
72
# File 'lib/logbox/observation_compiler.rb', line 70

def remove_working_path
  FileUtils.rm_r @working_path
end

#secret_access_key_from_keychain(key_id) ⇒ Object



221
222
223
224
225
226
227
228
229
# File 'lib/logbox/observation_compiler.rb', line 221

def secret_access_key_from_keychain (key_id)
  @credentials ||= {}
  unless @credentials[key_id]
    dump = `security -q find-generic-password -a "#{key_id}" -g 2>&1`
    secret_key = dump[/password: "(.*)"/, 1]
    @credentials[key_id] = secret_key
  end
  @credentials[key_id]
end

#secret_access_key_from_keychain!(key_id) ⇒ Object

These two methods are borrowed from Awsborn



215
216
217
218
219
# File 'lib/logbox/observation_compiler.rb', line 215

def secret_access_key_from_keychain! (key_id)
  secret = secret_access_key_from_keychain key_id
  raise "Could not find secret access key for #{key_id}" if secret.to_s == ''
  secret
end

#sort_processed(date_range) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/logbox/observation_compiler.rb', line 152

def sort_processed(date_range) 
  date_range.each do |date|
    name = processed_log_name(date)
    Dir.chdir processed_working_path do
      next unless File.exist?(name)
      log "Sorting #{name}"
      ENV['LC_ALL'] = 'C'
      ok = system "sort -t: -k2,4 #{name} | uniq > #{name}.sorted"
      raise "Sort error!" unless ok
      File.rename("#{name}.sorted", name)
    end
  end
end

#unzip_processed(date_range) ⇒ Object



175
176
177
178
179
180
181
182
# File 'lib/logbox/observation_compiler.rb', line 175

def unzip_processed(date_range)
  log "Unzipping processed files"
  date_range.each do |date|
    name = processed_log_name(date) + ".gz"
    file = File.join(processed_working_path, name)
    system "gunzip #{file}" if File.exist? file
  end
end

#unzip_raw(date) ⇒ Object



102
103
104
105
106
107
# File 'lib/logbox/observation_compiler.rb', line 102

def unzip_raw(date)
  log "Unzipping raw logs for #{date}"
  raw_log_paths(date).each do |raw_log|
    system "gunzip #{raw_log}" if raw_log.end_with?(".gz")
  end
end

#zip_processed(date_range) ⇒ Object



166
167
168
169
170
171
172
173
# File 'lib/logbox/observation_compiler.rb', line 166

def zip_processed(date_range)
  log "Zipping processed files"
  date_range.each do |date|
    name = processed_log_name(date)
    file = File.join(processed_working_path, name)
    system "gzip #{file}" if File.exist? file
  end
end