Class: UState::Server::Index
- Inherits:
-
Object
- Object
- UState::Server::Index
- Defined in:
- lib/ustate/server/index.rb
Defined Under Namespace
Classes: ParseFailed
Constant Summary collapse
- THREADS =
1000
- BUFFER_SIZE =
10
- INSERT_RATE_INTERVAL =
Update metrics every
5
- INSERT_TIMES_INTERVAL =
5
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
-
#insert_rate_interval ⇒ Object
Returns the value of attribute insert_rate_interval.
-
#insert_times_interval ⇒ Object
Returns the value of attribute insert_times_interval.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #<<(s) ⇒ Object
- #clear ⇒ Object
-
#delete(state) ⇒ Object
Removes a state from the index.
-
#initialize(opts = {}) ⇒ Index
constructor
A new instance of Index.
- #on_state(state = nil, &block) ⇒ Object
- #on_state_change(old = nil, new = nil, &block) ⇒ Object
- #on_state_once(state = nil, &block) ⇒ Object
- #process(s) ⇒ Object
-
#query(q) ⇒ Object
Returns an array of States matching Query.
-
#row_to_state(row) ⇒ Object
Converts a row to a State.
- #setup_db ⇒ Object
- #start ⇒ Object
-
#stop ⇒ Object
Finish up.
- #stop! ⇒ Object
- #thread(s) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Index
Returns a new instance of Index.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/ustate/server/index.rb', line 22 def initialize(opts = {}) @db = Sequel.sqlite @server = opts[:server] @threads = opts[:threads] || THREADS @pool = [] @on_state_change = [] @on_state_once = [] @on_state = [] @insert_rate_interval = opts[:insert_rate_interval] || INSERT_RATE_INTERVAL @insert_times_interval = opts[:insert_times_interval] || INSERT_TIMES_INTERVAL setup_db end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
18 19 20 |
# File 'lib/ustate/server/index.rb', line 18 def db @db end |
#insert_rate_interval ⇒ Object
Returns the value of attribute insert_rate_interval.
19 20 21 |
# File 'lib/ustate/server/index.rb', line 19 def insert_rate_interval @insert_rate_interval end |
#insert_times_interval ⇒ Object
Returns the value of attribute insert_times_interval.
20 21 22 |
# File 'lib/ustate/server/index.rb', line 20 def insert_times_interval @insert_times_interval end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
18 19 20 |
# File 'lib/ustate/server/index.rb', line 18 def queue @queue end |
Instance Method Details
#<<(s) ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/ustate/server/index.rb', line 42 def <<(s) t0 = Time.now process s dt = Time.now - t0 @insert_times << dt @insert_rate << 1 end |
#clear ⇒ Object
38 39 40 |
# File 'lib/ustate/server/index.rb', line 38 def clear setup_db end |
#delete(state) ⇒ Object
Removes a state from the index.
Right now state is anything which responds to #host and #service. I’ll probably evolve the index to support arbitrary operations on all states matching a query, but haven’t thought out the API.
55 56 57 |
# File 'lib/ustate/server/index.rb', line 55 def delete(state) @db[:states].filter(host: state.host, service: state.service).delete end |
#on_state(state = nil, &block) ⇒ Object
88 89 90 91 92 93 94 95 96 |
# File 'lib/ustate/server/index.rb', line 88 def on_state(state = nil, &block) if block_given? @on_state |= [block] else @on_state.each do |callback| callback.call state end end end |
#on_state_change(old = nil, new = nil, &block) ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/ustate/server/index.rb', line 68 def on_state_change(old = nil, new = nil, &block) if block_given? @on_state_change |= [block] else @on_state_change.each do |callback| callback.call old, new end end end |
#on_state_once(state = nil, &block) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/ustate/server/index.rb', line 78 def on_state_once(state = nil, &block) if block_given? @on_state_once |= [block] else @on_state_once.each do |callback| callback.call state end end end |
#process(s) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/ustate/server/index.rb', line 98 def process(s) if s.once on_state_once s return on_state s end if current = @db[:states][host: s.host, service: s.service] # Update if current[:time] <= s.time if current[:state] != s.state on_state_change row_to_state(current), s end # Update @db[:states].filter(host: s.host, service: s.service).update( state: s.state, time: s.time, description: s.description, metric_f: s.metric_f ) end else # Insert @db[:states].insert( host: s.host, service: s.service, state: s.state, time: s.time, description: s.description, metric_f: s.metric_f ) end on_state s end |
#query(q) ⇒ Object
Returns an array of States matching Query.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/ustate/server/index.rb', line 135 def query(q) parser = QueryStringParser.new if q.string unless expression = parser.parse(q.string) raise ParseFailed, "error parsing #{q.string.inspect} at line #{parser.failure_line}:#{parser.failure_column}: #{parser.failure_reason}" end filter = expression.sql else # No string? All states. filter = true end ds = @db[:states].filter filter ds.all.map do |row| row_to_state row end end |
#row_to_state(row) ⇒ Object
Converts a row to a State
154 155 156 |
# File 'lib/ustate/server/index.rb', line 154 def row_to_state(row) State.new(row) end |
#setup_db ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/ustate/server/index.rb', line 158 def setup_db @db.drop_table :states rescue nil @db.create_table :states do String :host String :service String :state String :description, :text => true Integer :time Float :metric_f primary_key [:host, :service] end end |
#start ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/ustate/server/index.rb', line 171 def start stop! @pool = [] @insert_rate = MetricThread.new(Mtrc::Rate) do |r| self << State.new( service: "ustate insert rate", state: "ok", host: Socket.gethostname, time: Time.now.to_i, metric_f: r.rate.to_f, ) end @insert_rate.interval = @insert_rate_interval @insert_times = MetricThread.new(Mtrc::SortedSamples) do |r| self << State.new( service: "ustate insert 50", state: "ok", host: Socket.gethostname, time: Time.now.to_i, metric_f: r % 50, ) self << State.new( service: "ustate insert 95", state: "ok", host: Socket.gethostname, time: Time.now.to_i, metric_f: r % 95, ) self << State.new( service: "ustate insert 99", state: "ok", host: Socket.gethostname, time: Time.now.to_i, metric_f: r % 99, ) end @insert_times.interval = @insert_times_interval end |
#stop ⇒ Object
Finish up
213 214 215 216 |
# File 'lib/ustate/server/index.rb', line 213 def stop @insert_rate.stop rescue nil @insert_times.stop rescue nil end |
#stop! ⇒ Object
218 219 220 221 222 |
# File 'lib/ustate/server/index.rb', line 218 def stop! @pool.each do |thread| thread.kill end end |
#thread(s) ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/ustate/server/index.rb', line 59 def thread(s) Thread.new do process s @pooltex.synchronize do @pool.delete Thread.current end end end |