Class: GRI::AppCollector
- Defined in:
- lib/gri/pcollector.rb,
lib/gri/app_collector.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
-
#writers ⇒ Object
readonly
Returns the value of attribute writers.
Instance Method Summary collapse
- #fillup_queue(n, sock_path, targets, scheduler) ⇒ Object
- #fork_child(server_sock, sock_path, nproc, targets, log_dir, scheduler_class, fdh) ⇒ Object
- #get_max_queue_size ⇒ Object
- #get_ptargets(targets, basetime, duration, offset = 0, default_interval = 300) ⇒ Object
- #get_targets_from_lines(lines, config) ⇒ Object
- #get_tra_uri(tra_str) ⇒ Object
-
#initialize(config) ⇒ AppCollector
constructor
A new instance of AppCollector.
- #load_fake_descr_files(files) ⇒ Object
- #load_target_lines(config) ⇒ Object
- #parse_host_key(s) ⇒ Object
- #run ⇒ Object
- #run_para(targets, scheduler_class, start_time, fdh) ⇒ Object
- #run_single(targets, scheduler_class, start_time, fdh) ⇒ Object
- #server_loop(targets, ptargets, server_sock, waittime) ⇒ Object
Constructor Details
#initialize(config) ⇒ AppCollector
Returns a new instance of AppCollector.
12 13 14 15 16 |
# File 'lib/gri/app_collector.rb', line 12 def initialize config @config = config @writers = [] @metrics = Hash.new 0 end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/gri/app_collector.rb', line 10 def config @config end |
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
10 11 12 |
# File 'lib/gri/app_collector.rb', line 10 def metrics @metrics end |
#writers ⇒ Object (readonly)
Returns the value of attribute writers.
10 11 12 |
# File 'lib/gri/app_collector.rb', line 10 def writers @writers end |
Instance Method Details
#fillup_queue(n, sock_path, targets, scheduler) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/gri/pcollector.rb', line 128 def fillup_queue n, sock_path, targets, scheduler e = false mqs = get_max_queue_size while scheduler.queue.size < mqs begin unless File.socket? sock_path e = true break end sock = UNIXSocket.new sock_path rescue Errno::ECONNREFUSED sock.close rescue nil sleep(0.1 + rand) retry rescue SystemCallError sock.close rescue nil e = true break end begin unless (line = sock.gets) e = true break end rescue e = true break ensure sock.close end num, host = line.split scheduler.queue.push targets[num.to_i] scheduler.process_queue end e end |
#fork_child(server_sock, sock_path, nproc, targets, log_dir, scheduler_class, fdh) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/gri/pcollector.rb', line 165 def fork_child server_sock, sock_path, nproc, targets, log_dir, scheduler_class, fdh pids = [] ppid = $$ for n in 1..nproc pid = fork { server_sock.close start_time = Time.now sleep 0.05 * n Log.debug "child ##{n}" loop = Loop.new @writers.each {|writer| writer.loop = loop} scheduler = scheduler_class.new loop, @metrics scheduler.queue = [] scheduler.writers = @writers scheduler.fake_descr_hash = fdh e = fillup_queue n, sock_path, targets, scheduler scheduler.process_queue if !e or loop.has_active_watchers? while true loop.run_once break if e and !loop.has_active_watchers? e = fillup_queue n, sock_path, targets, scheduler scheduler.process_queue end end scheduler.finalize rc = @metrics[:run_count] elapsed = Time.now - start_time Log.debug "end ##{n} #{rc} #{rc/elapsed}" #@metrics["run_count#{n}".intern] = rc begin path = "#{log_dir}/res.#{ppid}.#{$$}.dump" Marshal.dump_to_file @metrics, path rescue SystemCallError Log.error "#{$!}" end } pids.push pid end pids end |
#get_max_queue_size ⇒ Object
124 125 126 |
# File 'lib/gri/pcollector.rb', line 124 def get_max_queue_size 4 end |
#get_ptargets(targets, basetime, duration, offset = 0, default_interval = 300) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/gri/pcollector.rb', line 96 def get_ptargets targets, basetime, duration, offset=0, default_interval=300 if duration.zero? ptargets = {basetime=>(0..targets.size-1).to_a} else intervals = {} n = 0 for host, in targets interval = (['interval'] || default_interval).to_i next if interval.zero? (intervals[interval] ||= []).push n n += 1 end ptargets = {} et = basetime + duration for interval in intervals.keys st = basetime - basetime % interval (0..duration/interval).each {|n| s = n * interval if (t = st + s) >= basetime and t < et ptargets[t+offset] ||= [] ptargets[t+offset] += intervals[interval] end } end end ptargets end |
#get_targets_from_lines(lines, config) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/gri/app_collector.rb', line 121 def get_targets_from_lines lines, config targets = Config.get_targets_from_lines lines = Config.(*(config.getvar 'option')) .merge!(Config.(*config.getvar('O'))) if config['host-pat'] hosts_re = config.getvar('host-pat').map {|h| Regexp.new h} targets = targets.select {|host, | hosts_re.detect {|re| re === host}} #re = Regexp.new config['host-pat'] #targets = targets.select {|host, options| re === host} end for host, in targets = .clone .merge! Config.option_if_match(host, 'option-if-host', config) .merge! .replace end targets end |
#get_tra_uri(tra_str) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/gri/app_collector.rb', line 94 def get_tra_uri tra_str if tra_str =~ /\A[-\w\.]+(:\d+)\z/ tra_str = "http://#{tra_str}/" elsif tra_str =~ /\A[-\w\.]+\z/ tra_str = "http://#{tra_str}:7080/" end uri = URI.parse tra_str rescue nil end |
#load_fake_descr_files(files) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/gri/app_collector.rb', line 76 def load_fake_descr_files files h = {} for path in files if File.exist? path open(path) {|f| while line = f.gets if line =~ /\A([-\.\dA-Za-z]+_\S+)\s+(.*)/ descr = $2 host, key = $1.split(/_/, 2) (h[host] ||= {})[key] = descr end end } end end h end |
#load_target_lines(config) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/gri/app_collector.rb', line 103 def load_target_lines config if config['updater'] and (tra_str = config['tra']) and !config['gritab-path'] tra_uri = get_tra_uri tra_str lines = RemoteLDB.get_gritab_lines tra_uri else root_dir = config['root-dir'] ||= Config::ROOT_PATH gritab_path = config['gritab-path'] || root_dir + '/gritab' lines = [] File.open(gritab_path) {|f| while line = f.gets lines.push line end } end lines end |
#parse_host_key(s) ⇒ Object
72 73 74 |
# File 'lib/gri/app_collector.rb', line 72 def parse_host_key s s.to_s.scan(/\A([-\.A-Za-z0-9]+)_([^_\d]*)(?:_?(.*))/).first end |
#run ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/gri/app_collector.rb', line 18 def run start_time = Time.now root_dir = config['root-dir'] ||= Config::ROOT_PATH lines = load_target_lines config targets = get_targets_from_lines lines, config files = Config.getvar 'fake-descr-file' fdh = load_fake_descr_files files if files if config['updater'] if (tra_str = config['tra']) tra_uri = get_tra_uri tra_str TraCollector.tra_uri = tra_uri TraCollector.db_class = RemoteLDB else tra_dir = config['tra-dir'] || root_dir + '/tra' TraCollector.tra_dir = tra_dir TraCollector.db_class = LocalLDB end gra_dir = config['gra-dir'] || root_dir + '/gra' Dir.mkdir gra_dir unless File.directory? gra_dir TraCollector.gra_dir = gra_dir scheduler_class = UScheduler h = {} targets.each {|ary| (hostname = ary[1]['hostname'] || ary[1]['alias']) and (ary[0] = hostname) } targets = targets.select {|host, | f = h[host]; h[host] = true; !f} else scheduler_class = Scheduler end Log.info "START: pid #{$$}" if config['para'] run_para targets, scheduler_class, start_time.to_i, fdh else run_single targets, scheduler_class, start_time.to_i, fdh end for writer in @writers if writer.respond_to? :merge writer.merge end if writer.respond_to? :purge_logs writer.purge_logs end end Log.info "END: pid #{$$}" @metrics['targets'] = targets.size @metrics['collector_elapsed'] = Time.now - start_time end |
#run_para(targets, scheduler_class, start_time, fdh) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/gri/pcollector.rb', line 7 def run_para targets, scheduler_class, start_time, fdh sock_path = config['sock-path'] || '/tmp/.gcollectsock' begin server_sock = UNIXServer.new sock_path rescue SystemCallError puts "#{$!}: server_sock error" if $debug Log.fatal "#{$!}: server_sock error" return end duration = (config['duration'] || 0).to_i if duration.zero? basetime = start_time offset = 0 else basetime = start_time - start_time % duration offset = start_time - basetime end interval = (config['interval'] || 300).to_i ptargets = get_ptargets targets, basetime, duration, offset, interval log_dir = config['log-dir'] || (config['root-dir'] + '/log') Dir.glob("#{log_dir}/res.*.dump") {|path| File.unlink path} rescue nil max_processes = (config['max-processes'] || config['max-fork-process']).to_i max_processes = 30 if max_processes < 1 nproc = [targets.size * 2 / 3 + 1, max_processes].min waittime = [20, duration].min begin pids = fork_child server_sock, sock_path, nproc, targets, log_dir, scheduler_class, fdh server_loop targets, ptargets, server_sock, waittime rescue TimeoutError, SystemCallError Log.error $!.inspect ensure server_sock.close Log.info "server_sock.close" File.unlink sock_path end pids.each {|pid| Process.waitpid pid} Dir.glob("#{log_dir}/res.#{$$}.*.dump") {|path| begin res = Marshal.load_from_file path res.each {|k, v| @metrics[k] += v} File.unlink path rescue SystemCallError Log.error "{$!}" end } end |
#run_single(targets, scheduler_class, start_time, fdh) ⇒ Object
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/gri/app_collector.rb', line 143 def run_single targets, scheduler_class, start_time, fdh loop = Loop.new @writers.each {|writer| writer.loop = loop} scheduler = scheduler_class.new loop, @metrics scheduler.writers = @writers scheduler.queue = targets scheduler.process_queue loop.run scheduler.finalize end |
#server_loop(targets, ptargets, server_sock, waittime) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/gri/pcollector.rb', line 57 def server_loop targets, ptargets, server_sock, waittime sock = nil pkeys = ptargets.keys.sort pts = [] while true break if pkeys.empty? now = Time.now.to_f t = pkeys.first if t <= now pkeys.shift pts += ptargets[t] while (n = pts.shift) Timeout.timeout(waittime) {sock = server_sock.accept} if (res = IO.select(nil, [sock], nil, 20)) thost = targets[n].first sock.puts "#{n} #{thost}" sock.close else sock.close raise TimeoutError, 'select timeout' end if pts.empty? and (t = pkeys.first) now = Time.now.to_f if t <= now pkeys.shift small, big = [pts, ptargets[t]].sort_by {|e| e.size} pts.replace big.zip(small) pts.flatten! pts.compact! #pts += ptargets[t] end end end else sleep(t - now) end end end |