Class: Promiscuous::Subscriber::Worker::Stats

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/subscriber/worker/stats.rb

Instance Method Summary collapse

Instance Method Details

#aggregate_statsObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/promiscuous/subscriber/worker/stats.rb', line 27

def aggregate_stats
  processed_messages = nil
  total_response_time = nil
  @redis.multi do
    processed_messages = @redis.getset(@key_processed_message, 0)
    total_response_time = @redis.getset(@key_total_response_time, 0)
  end

  last_aggregate = @last_aggregate
  @last_aggregate = Time.now
  processed_messages = processed_messages.value.to_i
  total_response_time = total_response_time.value.to_i

  rate = processed_messages.to_f / (Time.now - last_aggregate)
  rate_str = sprintf("%.1f", rate)
  latency = 0
  latency_str = "N/A"
  unless processed_messages.zero?
    latency = total_response_time.to_f / (1000 * processed_messages).to_f
    if latency > 2.minutes
      latency_str = sprintf("%.3fmin", latency / 1.minute)
    else
      latency_str = sprintf("%.3fsec", latency)
    end
  end

  STDERR.puts "\e[1A" + "\b" * 200 + "Messages: Rate: #{rate_str} msg/s  Latency: #{latency_str}" + " " * 30
  Promiscuous::Config.on_stats.call(rate, latency)
end

#connectObject



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/promiscuous/subscriber/worker/stats.rb', line 2

def connect
  @interval = Promiscuous::Config.stats_interval.to_f
  return if @interval.zero? || @redis

  url = Promiscuous::Config.redis_stats_url
  @redis = ::Redis.new(:url => url)
  key = Promiscuous::Key.new(:sub).join(Socket.gethostname)
  @key_processed_message = key.join('__stats__', 'processed_messages').to_s
  @key_total_response_time = key.join('__stats__', 'total_response_time').to_s

  @redis.set(@key_processed_message, 0)
  @redis.set(@key_total_response_time, 0)
  @last_aggregate = Time.now

  STDERR.puts ""

  @timer ||= Promiscuous::Timer.new("stats", @interval) { aggregate_stats }
  @timer.start
end

#disconnectObject



22
23
24
25
# File 'lib/promiscuous/subscriber/worker/stats.rb', line 22

def disconnect
  @timer.try(:reset)
  @redis.client.disconnect rescue nil if @redis
end

#notify_processed_message(msg, time) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/promiscuous/subscriber/worker/stats.rb', line 57

def notify_processed_message(msg, time)
  return if !@redis

  msecs = (time.to_i * 1000 + time.usec / 1000).to_i - msg.timestamp
  @redis.pipelined do
    @redis.incr(@key_processed_message)
    @redis.incrby(@key_total_response_time, msecs) unless msg.timestamp.zero?
  end
end