Class: Raindrops::Aggregate::PMQ
- Inherits:
-
Object
- Object
- Raindrops::Aggregate::PMQ
- Defined in:
- lib/raindrops/aggregate/pmq.rb
Overview
Aggregate + POSIX message queues support for Ruby 1.9 and Linux
This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.
Unlike the core of raindrops, this is only supported on Ruby 1.9 and Linux 2.6. Using this class requires the following additional RubyGems or libraries:
-
aggregate (tested with 0.2.2)
-
io-extra (tested with 1.2.3)
-
posix_mq (tested with 1.0.0)
Design
There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from
Setting :worker_interval
and :master_interval
to 1
will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.
Constant Summary collapse
- RDLOCK =
:stopdoc: These constants are for Linux. This is designed for aggregating TCP_INFO.
[ Fcntl::F_RDLCK ].pack("s @256")
- WRLOCK =
[ Fcntl::F_WRLCK ].pack("s @256")
- UNLOCK =
[ Fcntl::F_UNLCK ].pack("s @256")
Instance Attribute Summary collapse
-
#nr_dropped ⇒ Object
readonly
returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy.
Instance Method Summary collapse
-
#<<(val) ⇒ Object
adds a sample to the underlying Aggregate object.
-
#aggregate ⇒ Object
Loads the last shared Aggregate from the master thread/process.
-
#count ⇒ Object
proxy for Aggregate#count.
-
#each ⇒ Object
proxy for Aggregate#each.
-
#each_nonzero ⇒ Object
proxy for Aggregate#each_nonzero.
-
#flush ⇒ Object
flushes the local queue of the worker process, sending all pending data to the master.
-
#flush_master ⇒ Object
Flushes the currently aggregate statistics to a temporary file.
-
#initialize(params = {}) ⇒ PMQ
constructor
Creates a new Raindrops::Aggregate::PMQ object.
-
#lock!(io, type) ⇒ Object
:nodoc:.
-
#master_loop ⇒ Object
Starts running a master loop, usually in a dedicated thread or process:.
-
#max ⇒ Object
proxy for Aggregate#max.
-
#mean ⇒ Object
proxy for Aggregate#mean.
-
#min ⇒ Object
proxy for Aggregate#min.
-
#mq_send(val) ⇒ Object
:nodoc:.
-
#outliers_high ⇒ Object
proxy for Aggregate#outliers_high.
-
#outliers_low ⇒ Object
proxy for Aggregate#outliers_low.
-
#std_dev ⇒ Object
proxy for Aggregate#std_dev.
-
#stop_master_loop ⇒ Object
stops the currently running master loop, may be called from any worker thread or process.
-
#sum ⇒ Object
proxy for Aggregate#sum.
-
#synchronize(io, type) ⇒ Object
we use both a mutex for thread-safety and fcntl lock for process-safety.
-
#to_s(*args) ⇒ Object
proxy for Aggregate#to_s.
Constructor Details
#initialize(params = {}) ⇒ PMQ
Creates a new Raindrops::Aggregate::PMQ object
Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate
options
is a hash that accepts the following keys:
-
:queue - name of the POSIX message queue (default: “/raindrops”)
-
:worker_interval - interval to send to the master (default: 10)
-
:master_interval - interval to for the master to write out (default: 5)
-
:lossy - workers drop packets if master cannot keep up (default: false)
-
:aggregate - Aggregate object (default: Aggregate.new)
-
:mq_umask - umask for creatingthe POSIX message queue (default: 0666)
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/raindrops/aggregate/pmq.rb', line 65 def initialize(params = {}) opts = { :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, :mq_attr => nil, :mq_umask => 0666, :aggregate => Aggregate.new, }.merge! params @master_interval = opts[:master_interval] @worker_interval = opts[:worker_interval] @aggregate = opts[:aggregate] @worker_queue = @worker_interval ? [] : nil @mutex = Mutex.new @mq_name = opts[:queue] mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr] Tempfile.open("raindrops_pmq") do |t| @wr = File.open(t.path, "wb") @rd = File.open(t.path, "rb") end @cached_aggregate = @aggregate flush_master @mq_send = if opts[:lossy] @nr_dropped = 0 mq.nonblock = true mq.method :trysend else mq.method :send end end |
Instance Attribute Details
#nr_dropped ⇒ Object (readonly)
returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy
49 50 51 |
# File 'lib/raindrops/aggregate/pmq.rb', line 49 def nr_dropped @nr_dropped end |
Instance Method Details
#<<(val) ⇒ Object
adds a sample to the underlying Aggregate object
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/raindrops/aggregate/pmq.rb', line 99 def << val if q = @worker_queue q << val if q.size >= @worker_interval mq_send(q) or @nr_dropped += 1 q.clear end else mq_send(val) or @nr_dropped += 1 end end |
#aggregate ⇒ Object
Loads the last shared Aggregate from the master thread/process
150 151 152 153 154 155 156 157 |
# File 'lib/raindrops/aggregate/pmq.rb', line 150 def aggregate @cached_aggregate ||= begin flush Marshal.load(synchronize(@rd, RDLOCK) do |rd| IO.pread rd.fileno, rd.stat.size, 0 end) end end |
#count ⇒ Object
proxy for Aggregate#count
208 |
# File 'lib/raindrops/aggregate/pmq.rb', line 208 def count; aggregate.count; end |
#each ⇒ Object
proxy for Aggregate#each
235 |
# File 'lib/raindrops/aggregate/pmq.rb', line 235 def each; aggregate.each { |*args| yield(*args) }; end |
#each_nonzero ⇒ Object
proxy for Aggregate#each_nonzero
238 |
# File 'lib/raindrops/aggregate/pmq.rb', line 238 def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end |
#flush ⇒ Object
flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval
defines how frequently your queue will be flushed
199 200 201 202 203 204 205 |
# File 'lib/raindrops/aggregate/pmq.rb', line 199 def flush if q = @local_queue && ! q.empty? mq_send q q.clear end nil end |
#flush_master ⇒ Object
Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval
defines how frequently your data will be flushed for workers to read.
162 163 164 165 166 167 168 |
# File 'lib/raindrops/aggregate/pmq.rb', line 162 def flush_master dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| wr.truncate 0 IO.pwrite wr.fileno, dump, 0 end end |
#lock!(io, type) ⇒ Object
:nodoc:
178 179 180 181 182 |
# File 'lib/raindrops/aggregate/pmq.rb', line 178 def lock! io, type # :nodoc: io.fcntl Fcntl::F_SETLKW, type rescue Errno::EINTR retry end |
#master_loop ⇒ Object
Starts running a master loop, usually in a dedicated thread or process:
Thread.new { agg.master_loop }
Any worker can call agg.stop_master_loop
to stop the master loop (possibly causing the thread or process to exit)
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/raindrops/aggregate/pmq.rb', line 123 def master_loop buf = "" a = @aggregate nr = 0 mq = POSIX_MQ.new @mq_name, :r # this one is always blocking begin if (nr -= 1) < 0 nr = @master_interval flush_master end mq.shift(buf) data = begin Marshal.load(buf) or return rescue ArgumentError, TypeError next end Array === data ? data.each { |x| a << x } : a << data rescue Errno::EINTR rescue => e warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" break end while true ensure flush_master end |
#max ⇒ Object
proxy for Aggregate#max
211 |
# File 'lib/raindrops/aggregate/pmq.rb', line 211 def max; aggregate.max; end |
#mean ⇒ Object
proxy for Aggregate#mean
220 |
# File 'lib/raindrops/aggregate/pmq.rb', line 220 def mean; aggregate.mean; end |
#min ⇒ Object
proxy for Aggregate#min
214 |
# File 'lib/raindrops/aggregate/pmq.rb', line 214 def min; aggregate.min; end |
#mq_send(val) ⇒ Object
:nodoc:
111 112 113 114 |
# File 'lib/raindrops/aggregate/pmq.rb', line 111 def mq_send(val) # :nodoc: @cached_aggregate = nil @mq_send.call Marshal.dump(val) end |
#outliers_high ⇒ Object
proxy for Aggregate#outliers_high
229 |
# File 'lib/raindrops/aggregate/pmq.rb', line 229 def outliers_high; aggregate.outliers_high; end |
#outliers_low ⇒ Object
proxy for Aggregate#outliers_low
226 |
# File 'lib/raindrops/aggregate/pmq.rb', line 226 def outliers_low; aggregate.outliers_low; end |
#std_dev ⇒ Object
proxy for Aggregate#std_dev
223 |
# File 'lib/raindrops/aggregate/pmq.rb', line 223 def std_dev; aggregate.std_dev; end |
#stop_master_loop ⇒ Object
stops the currently running master loop, may be called from any worker thread or process
172 173 174 175 176 |
# File 'lib/raindrops/aggregate/pmq.rb', line 172 def stop_master_loop sleep 0.1 until mq_send(false) rescue Errno::EINTR retry end |
#sum ⇒ Object
proxy for Aggregate#sum
217 |
# File 'lib/raindrops/aggregate/pmq.rb', line 217 def sum; aggregate.sum; end |
#synchronize(io, type) ⇒ Object
we use both a mutex for thread-safety and fcntl lock for process-safety
185 186 187 188 189 190 191 192 193 194 |
# File 'lib/raindrops/aggregate/pmq.rb', line 185 def synchronize io, type # :nodoc: @mutex.synchronize do begin lock! io, type yield io ensure lock! io, UNLOCK end end end |
#to_s(*args) ⇒ Object
proxy for Aggregate#to_s
232 |
# File 'lib/raindrops/aggregate/pmq.rb', line 232 def to_s(*args); aggregate.to_s(*args); end |