Class: Raindrops::Aggregate::PMQ

Inherits:
Object
  • Object
show all
Defined in:
lib/raindrops/aggregate/pmq.rb

Overview

Aggregate + POSIX message queues support for Ruby 1.9 and Linux

This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.

Unlike the core of raindrops, this is only supported on Ruby 1.9 and Linux 2.6. Using this class requires the following additional RubyGems or libraries:

  • aggregate (tested with 0.2.2)

  • io-extra (tested with 1.2.3)

  • posix_mq (tested with 1.0.0)

Design

There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from

Setting :worker_interval and :master_interval to 1 will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.

Constant Summary collapse

RDLOCK =

:stopdoc: These constants are for Linux. This is designed for aggregating TCP_INFO.

[ Fcntl::F_RDLCK ].pack("s @256")
WRLOCK =
[ Fcntl::F_WRLCK ].pack("s @256")
UNLOCK =
[ Fcntl::F_UNLCK ].pack("s @256")

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ PMQ

Creates a new Raindrops::Aggregate::PMQ object

Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate

options is a hash that accepts the following keys:

  • :queue - name of the POSIX message queue (default: “/raindrops”)

  • :worker_interval - interval to send to the master (default: 10)

  • :master_interval - interval to for the master to write out (default: 5)

  • :lossy - workers drop packets if master cannot keep up (default: false)

  • :aggregate - Aggregate object (default: Aggregate.new)

  • :mq_umask - umask for creatingthe POSIX message queue (default: 0666)



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/raindrops/aggregate/pmq.rb', line 65

def initialize(params = {})
  opts = {
    :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
    :worker_interval => 10,
    :master_interval => 5,
    :lossy => false,
    :mq_attr => nil,
    :mq_umask => 0666,
    :aggregate => Aggregate.new,
  }.merge! params
  @master_interval = opts[:master_interval]
  @worker_interval = opts[:worker_interval]
  @aggregate = opts[:aggregate]
  @worker_queue = @worker_interval ? [] : nil
  @mutex = Mutex.new

  @mq_name = opts[:queue]
  mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
  Tempfile.open("raindrops_pmq") do |t|
    @wr = File.open(t.path, "wb")
    @rd = File.open(t.path, "rb")
  end
  @cached_aggregate = @aggregate
  flush_master
  @mq_send = if opts[:lossy]
    @nr_dropped = 0
    mq.nonblock = true
    mq.method :trysend
  else
    mq.method :send
  end
end

Instance Attribute Details

#nr_droppedObject (readonly)

returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy



49
50
51
# File 'lib/raindrops/aggregate/pmq.rb', line 49

def nr_dropped
  @nr_dropped
end

Instance Method Details

#<<(val) ⇒ Object

adds a sample to the underlying Aggregate object



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/raindrops/aggregate/pmq.rb', line 99

def << val
  if q = @worker_queue
    q << val
    if q.size >= @worker_interval
      mq_send(q) or @nr_dropped += 1
      q.clear
    end
  else
    mq_send(val) or @nr_dropped += 1
  end
end

#aggregateObject

Loads the last shared Aggregate from the master thread/process



150
151
152
153
154
155
156
157
# File 'lib/raindrops/aggregate/pmq.rb', line 150

def aggregate
  @cached_aggregate ||= begin
    flush
    Marshal.load(synchronize(@rd, RDLOCK) do |rd|
      IO.pread rd.fileno, rd.stat.size, 0
    end)
  end
end

#countObject

proxy for Aggregate#count



208
# File 'lib/raindrops/aggregate/pmq.rb', line 208

def count; aggregate.count; end

#eachObject

proxy for Aggregate#each



235
# File 'lib/raindrops/aggregate/pmq.rb', line 235

def each; aggregate.each { |*args| yield(*args) }; end

#each_nonzeroObject

proxy for Aggregate#each_nonzero



238
# File 'lib/raindrops/aggregate/pmq.rb', line 238

def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end

#flushObject

flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval defines how frequently your queue will be flushed



199
200
201
202
203
204
205
# File 'lib/raindrops/aggregate/pmq.rb', line 199

def flush
  if q = @local_queue && ! q.empty?
    mq_send q
    q.clear
  end
  nil
end

#flush_masterObject

Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval defines how frequently your data will be flushed for workers to read.



162
163
164
165
166
167
168
# File 'lib/raindrops/aggregate/pmq.rb', line 162

def flush_master
  dump = Marshal.dump @aggregate
  synchronize(@wr, WRLOCK) do |wr|
    wr.truncate 0
    IO.pwrite wr.fileno, dump, 0
  end
end

#lock!(io, type) ⇒ Object

:nodoc:



178
179
180
181
182
# File 'lib/raindrops/aggregate/pmq.rb', line 178

def lock! io, type # :nodoc:
  io.fcntl Fcntl::F_SETLKW, type
  rescue Errno::EINTR
    retry
end

#master_loopObject

Starts running a master loop, usually in a dedicated thread or process:

Thread.new { agg.master_loop }

Any worker can call agg.stop_master_loop to stop the master loop (possibly causing the thread or process to exit)



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/raindrops/aggregate/pmq.rb', line 123

def master_loop
  buf = ""
  a = @aggregate
  nr = 0
  mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
  begin
    if (nr -= 1) < 0
      nr = @master_interval
      flush_master
    end
    mq.shift(buf)
    data = begin
      Marshal.load(buf) or return
    rescue ArgumentError, TypeError
      next
    end
    Array === data ? data.each { |x| a << x } : a << data
  rescue Errno::EINTR
  rescue => e
    warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
    break
  end while true
  ensure
    flush_master
end

#maxObject

proxy for Aggregate#max



211
# File 'lib/raindrops/aggregate/pmq.rb', line 211

def max; aggregate.max; end

#meanObject

proxy for Aggregate#mean



220
# File 'lib/raindrops/aggregate/pmq.rb', line 220

def mean; aggregate.mean; end

#minObject

proxy for Aggregate#min



214
# File 'lib/raindrops/aggregate/pmq.rb', line 214

def min; aggregate.min; end

#mq_send(val) ⇒ Object

:nodoc:



111
112
113
114
# File 'lib/raindrops/aggregate/pmq.rb', line 111

def mq_send(val) # :nodoc:
  @cached_aggregate = nil
  @mq_send.call Marshal.dump(val)
end

#outliers_highObject

proxy for Aggregate#outliers_high



229
# File 'lib/raindrops/aggregate/pmq.rb', line 229

def outliers_high; aggregate.outliers_high; end

#outliers_lowObject

proxy for Aggregate#outliers_low



226
# File 'lib/raindrops/aggregate/pmq.rb', line 226

def outliers_low; aggregate.outliers_low; end

#std_devObject

proxy for Aggregate#std_dev



223
# File 'lib/raindrops/aggregate/pmq.rb', line 223

def std_dev; aggregate.std_dev; end

#stop_master_loopObject

stops the currently running master loop, may be called from any worker thread or process



172
173
174
175
176
# File 'lib/raindrops/aggregate/pmq.rb', line 172

def stop_master_loop
  sleep 0.1 until mq_send(false)
  rescue Errno::EINTR
    retry
end

#sumObject

proxy for Aggregate#sum



217
# File 'lib/raindrops/aggregate/pmq.rb', line 217

def sum; aggregate.sum; end

#synchronize(io, type) ⇒ Object

we use both a mutex for thread-safety and fcntl lock for process-safety



185
186
187
188
189
190
191
192
193
194
# File 'lib/raindrops/aggregate/pmq.rb', line 185

def synchronize io, type # :nodoc:
  @mutex.synchronize do
    begin
      lock! io, type
      yield io
    ensure
      lock! io, UNLOCK
    end
  end
end

#to_s(*args) ⇒ Object

proxy for Aggregate#to_s



232
# File 'lib/raindrops/aggregate/pmq.rb', line 232

def to_s(*args); aggregate.to_s(*args); end