Class: LogMerge::IPResolver
- Inherits:
-
Object
- Object
- LogMerge::IPResolver
- Defined in:
- lib/logmerge/resolver.rb
Constant Summary collapse
- FORMAT =
Marshal format version number.
0
- THREADS =
Default number of threads.
$TESTING ? 2 : 100
Class Method Summary collapse
Instance Method Summary collapse
- #expire_entries(start_time) ⇒ Object
- #flush ⇒ Object
-
#initialize(cache = {}) ⇒ IPResolver
constructor
A new instance of IPResolver.
- #marshal_dump ⇒ Object
- #marshal_load(dumped) ⇒ Object
- #resolve(ip) ⇒ Object
- #run(output, input) ⇒ Object
- #start_threads ⇒ Object
Constructor Details
#initialize(cache = {}) ⇒ IPResolver
Returns a new instance of IPResolver.
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/logmerge/resolver.rb', line 48 def initialize(cache = {}) @cache = cache @failures = [] @start_time = Time.now @dns = Resolv::DNS.new @output = nil @done = false @queue = SizedQueue.new THREADS @threads = ThreadGroup.new @buf = {} @cur = 0 end |
Class Method Details
.create ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/logmerge/resolver.rb', line 17 def self.create resolver = nil if File.exists? '.name_cache' then data = File.read '.name_cache' begin resolver = Marshal.load data rescue ArgumentError end end resolver ||= self.new yield resolver ensure begin File.open '.name_cache', 'w' do |fp| Marshal.dump resolver, fp end rescue Errno::EACCES # ignore end end |
.resolve(output = STDOUT, input = ARGF) ⇒ Object
42 43 44 45 46 |
# File 'lib/logmerge/resolver.rb', line 42 def self.resolve(output = STDOUT, input = ARGF) create do |resolver| resolver.run output, input end end |
Instance Method Details
#expire_entries(start_time) ⇒ Object
62 63 64 65 66 67 |
# File 'lib/logmerge/resolver.rb', line 62 def expire_entries(start_time) elapsed = (Time.now - start_time).to_i @cache.each { |ip, (name, ttl)| @cache[ip][1] = ttl - elapsed } @cache.delete_if { |ip, (name, ttl)| ttl <= 0 } return nil end |
#flush ⇒ Object
69 70 71 72 73 74 |
# File 'lib/logmerge/resolver.rb', line 69 def flush while @buf.has_key? @cur do @output.puts @buf.delete(@cur) @cur += 1 end end |
#marshal_dump ⇒ Object
76 77 78 79 80 81 82 |
# File 'lib/logmerge/resolver.rb', line 76 def marshal_dump @failures.each { |ip| @cache.delete ip } expire_entries @start_time # compact return [FORMAT, @cache, @start_time] end |
#marshal_load(dumped) ⇒ Object
84 85 86 87 88 89 90 91 92 |
# File 'lib/logmerge/resolver.rb', line 84 def marshal_load(dumped) format, cache, start_time = dumped case format when 0 then send :initialize, cache expire_entries start_time # expire end end |
#resolve(ip) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/logmerge/resolver.rb', line 94 def resolve(ip) name, = @cache[ip] return name unless name.nil? begin dns_name = Resolv::IPv4.create(ip).to_name rescue ArgumentError return ip end @dns.each_resource dns_name, Resolv::DNS::Resource::IN::PTR do |res| @cache[ip] = [res.name.to_s, res.ttl * 2] return res.name.to_s end @cache[ip] = [ip, 0] @failures << ip return ip end |
#run(output, input) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/logmerge/resolver.rb', line 116 def run(output, input) start_threads line_no = 0 @output = output input.each_line do |line| ip, rest = line.split ' ', 2 record = [line_no, ip, rest] line_no += 1 @queue << record flush if line_no % LogMerge::MAX_INFLIGHT == 0 end @done = true Thread.pass until @queue.empty? @threads.enclose until @threads.list.empty? do @threads.list.first.join end flush end |
#start_threads ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/logmerge/resolver.rb', line 141 def start_threads THREADS.times do Thread.start do @threads.add Thread.current loop do begin line_no, ip, rest = @queue.pop true rescue ThreadError break if @done Thread.pass retry end name = resolve ip @buf[line_no] = "#{name} #{rest}" end end end end |