Class: Jiggler::Stats::Monitor

Inherits:
Object
  • Object
show all
Includes:
Jiggler::Support::Helper
Defined in:
lib/jiggler/stats/monitor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Jiggler::Support::Helper

#log_error, #log_error_short, #logger, #safe_async, #scan_all, #tid

Constructor Details

#initialize(config, collection) ⇒ Monitor

Returns a new instance of Monitor.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/jiggler/stats/monitor.rb', line 10

def initialize(config, collection)
  @config = config
  @collection = collection
  @done = false
  @condition = Async::Condition.new
  # the key expiration should be greater than the stats interval
  # to avoid cases where the monitor is blocked
  # by long running workers and the key is not updated in time
  @exp = @config[:stats_interval] + 180 # interval + 3 minutes
  @rss_path = "/proc/#{Process.pid}/status"
end

Instance Attribute Details

#collectionObject (readonly)

Returns the value of attribute collection.



8
9
10
# File 'lib/jiggler/stats/monitor.rb', line 8

def collection
  @collection
end

#configObject (readonly)

Returns the value of attribute config.



8
9
10
# File 'lib/jiggler/stats/monitor.rb', line 8

def config
  @config
end

#data_keyObject (readonly)

Returns the value of attribute data_key.



8
9
10
# File 'lib/jiggler/stats/monitor.rb', line 8

def data_key
  @data_key
end

#expObject (readonly)

Returns the value of attribute exp.



8
9
10
# File 'lib/jiggler/stats/monitor.rb', line 8

def exp
  @exp
end

Instance Method Details

#cleanupObject



80
81
82
83
84
85
86
# File 'lib/jiggler/stats/monitor.rb', line 80

def cleanup
  config.with_async_redis do |conn| 
    cleanup_with(conn)
  rescue => err
    log_error_short(err, { context: '\'Cleanup error\'', tid: @tid })
  end
end

#load_data_into_redisObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/jiggler/stats/monitor.rb', line 46

def load_data_into_redis
  # logger.warn('Monitor runs')
  processed_jobs = collection.data[:processed]
  failed_jobs = collection.data[:failures]
  collection.data[:processed] -= processed_jobs
  collection.data[:failures] -= failed_jobs

  config.with_async_redis do |conn|
    conn.pipelined do |pipeline|
      pipeline.call('SET', collection.identity, process_data, ex: exp)
      pipeline.call('INCRBY', config.processed_counter, processed_jobs)
      pipeline.call('INCRBY', config.failures_counter, failed_jobs)
    end

    cleanup_with(conn) if @done
  rescue => err
    log_error_short(err, { context: '\'Monitor loading stats error\'', tid: @tid })
  end
end

#process_dataObject



38
39
40
41
42
43
44
# File 'lib/jiggler/stats/monitor.rb', line 38

def process_data
  Oj.dump({
    heartbeat: Time.now.to_f,
    rss: process_rss,
    current_jobs: collection.data[:current_jobs],
  }, mode: :compat)
end

#process_rssObject



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/jiggler/stats/monitor.rb', line 66

def process_rss
  case RUBY_PLATFORM
  when /linux/
    IO.readlines(@rss_path).each do |line|
      next unless line.start_with?('VmRSS:')
      break line.split[1].to_i
    end
  when /darwin|bsd/
    `ps -o pid,rss -p #{Process.pid}`.lines.last.split.last.to_i
  else
    nil
  end
end

#startObject



22
23
24
25
26
27
28
29
30
# File 'lib/jiggler/stats/monitor.rb', line 22

def start
  @job = safe_async('Monitor') do
    @tid = tid
    until @done
      load_data_into_redis
      wait unless @done
    end
  end
end

#terminateObject



32
33
34
35
36
# File 'lib/jiggler/stats/monitor.rb', line 32

def terminate
  @condition.signal
  @done = true
  cleanup
end

#waitObject



88
89
90
91
92
93
94
# File 'lib/jiggler/stats/monitor.rb', line 88

def wait
  Async(transient: true) do
    sleep(config[:stats_interval])
    @condition.signal
  end
  @condition.wait
end