Class: Twirl::Cluster
- Inherits:
-
Object
- Object
- Twirl::Cluster
- Extended by:
- Forwardable
- Includes:
- Enumerable
- Defined in:
- lib/twirl/cluster.rb
Constant Summary collapse
- RetryableErrors =
Private: The default Array of errors to retry.
[ KJess::NetworkError, # this inherits from protocol error, but seems like it should be retried KJess::ServerError, ]
Instance Attribute Summary collapse
-
#client_index ⇒ Object
readonly
Private: What is the array index of the client being used currently.
-
#command_count ⇒ Object
readonly
Private: The number of commands issued to the current client.
-
#commands_per_client ⇒ Object
readonly
Private: The number of commands to issue to a client before rotating.
-
#encoder ⇒ Object
readonly
Private: What handles dumping and loading values.
-
#instrumenter ⇒ Object
readonly
Private: What should be used to instrument all the things.
-
#retries ⇒ Object
readonly
Private: The number of times to retry retryable errors.
-
#retryable_errors ⇒ Object
readonly
Private: What errors should be considered retryable.
Instance Method Summary collapse
-
#client ⇒ Object
Private: Returns the client to be used to issue a command.
-
#client_read_op(client, op, queue_name, *args) ⇒ Object
Private: Perform an operation for a given client.
-
#delete(queue_name) ⇒ Object
Public : Remove a queue.
-
#disconnect ⇒ Object
Public: Disconnect from each client’s server.
-
#each(&block) ⇒ Object
Public: Iterate through the clients.
-
#flush(queue_name) ⇒ Object
Public: Remove all items from a queue.
-
#flush_all ⇒ Object
Public: Remove all items from all queues.
-
#get(queue_name, options = {}) ⇒ Object
Public: Retrieve an item from the given queue.
-
#initialize(clients, options = {}) ⇒ Cluster
constructor
Public: Initialize a new cluster.
-
#multi_client_op(op, *args, &block) ⇒ Object
Private: Perform an op on all the clients.
-
#multi_client_op_with_result(op, *args, &block) ⇒ Object
Private: Perform an op on all clients.
- #multi_client_queue_op_with_result(op, queue_name, *args, &block) ⇒ Object
-
#peek(queue_name) ⇒ Object
Public: Peek at the top item in the queue.
-
#ping ⇒ Object
Public: Which clients can actually reach their server.
-
#quit ⇒ Object
Public: Disconnect from each client’s server.
-
#reload ⇒ Object
Public: Reload the config of each client’s server.
-
#reserve(queue_name, options = {}) ⇒ Object
Public: Reserve the next item on the queue.
-
#rotate ⇒ Object
Private: Ensures that clients will be rotated by changing the client index and resetting the command count.
-
#rotate_for_next_op ⇒ Object
Private: Makes it so the client will rotate for the next operation.
-
#set(queue_name, item, expiration = 0) ⇒ Object
Public: Add an item to the given queue.
-
#shutdown ⇒ Object
Public: Tells each client to shutdown their server.
-
#stats ⇒ Object
Public: Return stats for each client’s server.
-
#version ⇒ Object
Public: Return the version of each server.
-
#with_retries ⇒ Object
Private: Retries an operation a number of times if it raises exception.
Constructor Details
#initialize(clients, options = {}) ⇒ Cluster
Public: Initialize a new cluster.
clients - An array of KJess::Client instances with port (localhost:1234) options - A Hash of options.
:commands_per_client - The Number of commands to run per client
before rotating to the next client (default: 100)
:retries - The Number of times a command should be retried (default: 5).
:instrumenter - Where to send instrumention (defaults: noop).
:encoder - What to use to dump/load vlues (defaults: mirror).
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/twirl/cluster.rb', line 55 def initialize(clients, = {}) @client_index = 0 @command_count = 0 @clients = clients.shuffle @retries = .fetch(:retries, 5) @commands_per_client = .fetch(:commands_per_client, 100) @instrumenter = .fetch(:instrumenter, Instrumenters::Noop) @encoder = .fetch(:encoder, Mirror) @retryable_errors = .fetch(:retryable_errors, RetryableErrors) end |
Instance Attribute Details
#client_index ⇒ Object (readonly)
Private: What is the array index of the client being used currently.
20 21 22 |
# File 'lib/twirl/cluster.rb', line 20 def client_index @client_index end |
#command_count ⇒ Object (readonly)
Private: The number of commands issued to the current client.
23 24 25 |
# File 'lib/twirl/cluster.rb', line 23 def command_count @command_count end |
#commands_per_client ⇒ Object (readonly)
Private: The number of commands to issue to a client before rotating.
29 30 31 |
# File 'lib/twirl/cluster.rb', line 29 def commands_per_client @commands_per_client end |
#encoder ⇒ Object (readonly)
Private: What handles dumping and loading values.
35 36 37 |
# File 'lib/twirl/cluster.rb', line 35 def encoder @encoder end |
#instrumenter ⇒ Object (readonly)
Private: What should be used to instrument all the things.
32 33 34 |
# File 'lib/twirl/cluster.rb', line 32 def instrumenter @instrumenter end |
#retries ⇒ Object (readonly)
Private: The number of times to retry retryable errors.
26 27 28 |
# File 'lib/twirl/cluster.rb', line 26 def retries @retries end |
#retryable_errors ⇒ Object (readonly)
Private: What errors should be considered retryable.
38 39 40 |
# File 'lib/twirl/cluster.rb', line 38 def retryable_errors @retryable_errors end |
Instance Method Details
#client ⇒ Object
Private: Returns the client to be used to issue a command.
219 220 221 222 223 224 |
# File 'lib/twirl/cluster.rb', line 219 def client rotate if @command_count >= @commands_per_client @command_count += 1 @clients[@client_index] end |
#client_read_op(client, op, queue_name, *args) ⇒ Object
Private: Perform an operation for a given client. Rotates clients if nil item is result of op.
Returns a Twirl::Item if an item was found, otherwise nil.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/twirl/cluster.rb', line 249 def client_read_op(client, op, queue_name, *args) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op payload[:queue_name] = queue_name payload[:retry] = tries != @retries if value = client.send(op, queue_name, *args) payload[:bytes] = value.size value = @encoder.load(value) if value Item.new queue_name, value, client, @instrumenter else rotate_for_next_op nil end } } end |
#delete(queue_name) ⇒ Object
Public : Remove a queue.
queue_name - The String name of the queue.
Returns a Hash of hosts and results.
143 144 145 |
# File 'lib/twirl/cluster.rb', line 143 def delete(queue_name) multi_client_queue_op_with_result :delete, queue_name end |
#disconnect ⇒ Object
Public: Disconnect from each client’s server.
Returns nothing.
207 208 209 |
# File 'lib/twirl/cluster.rb', line 207 def disconnect multi_client_op :disconnect end |
#each(&block) ⇒ Object
Public: Iterate through the clients.
67 68 69 |
# File 'lib/twirl/cluster.rb', line 67 def each(&block) @clients.each { |client| yield client } end |
#flush(queue_name) ⇒ Object
Public: Remove all items from a queue.
queue_name - The String name of the queue.
Returns a Hash of hosts and results.
152 153 154 |
# File 'lib/twirl/cluster.rb', line 152 def flush(queue_name) multi_client_queue_op_with_result :flush, queue_name end |
#flush_all ⇒ Object
Public: Remove all items from all queues.
Returns a Hash of hosts and results.
159 160 161 |
# File 'lib/twirl/cluster.rb', line 159 def flush_all multi_client_op_with_result :flush_all end |
#get(queue_name, options = {}) ⇒ Object
Public: Retrieve an item from the given queue.
It is possible to send both :open and :close in the same get operation, but I would not recommend it. You will end up in a situation where the client will rotate and the :close then goes to the wrong client.
We could do two get operations if you pass both options, send the :close to the current client and send the :open as a second operation to the rotated client, but that seems sneaky.
queue_name - The String name of the queue. options - The Hash of options for retrieving an item.
See KJess::Client#get for all options.
Returns a Twirl::Item if an item was found, otherwise nil.
111 112 113 |
# File 'lib/twirl/cluster.rb', line 111 def get(queue_name, = {}) client_read_op client, :get, queue_name, end |
#multi_client_op(op, *args, &block) ⇒ Object
Private: Perform an op on all the clients.
269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/twirl/cluster.rb', line 269 def multi_client_op(op, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op @clients.each do |client| if block_given? yield client else client.send(op, *args) end end } end |
#multi_client_op_with_result(op, *args, &block) ⇒ Object
Private: Perform an op on all clients.
Returns a Hash of the servers as keys and the results as values.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/twirl/cluster.rb', line 303 def multi_client_op_with_result(op, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op result = {} @clients.each { |client| result["#{client.host}:#{client.port}"] = if block_given? yield client else client.send(op, *args) end } result } end |
#multi_client_queue_op_with_result(op, queue_name, *args, &block) ⇒ Object
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/twirl/cluster.rb', line 283 def multi_client_queue_op_with_result(op, queue_name, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op payload[:queue_name] = queue_name result = {} @clients.each { |client| result["#{client.host}:#{client.port}"] = if block_given? yield client else client.send(op, queue_name, *args) end } result } end |
#peek(queue_name) ⇒ Object
Public: Peek at the top item in the queue.
queue_name - The String name of the queue.
Returns a Twirl::Item if an item was found, otherwise nil.
134 135 136 |
# File 'lib/twirl/cluster.rb', line 134 def peek(queue_name) client_read_op client, :peek, queue_name end |
#ping ⇒ Object
Public: Which clients can actually reach their server.
Returns Hash of hosts and results.
179 180 181 |
# File 'lib/twirl/cluster.rb', line 179 def ping multi_client_op_with_result :ping end |
#quit ⇒ Object
Public: Disconnect from each client’s server.
Returns Hash of hosts and results.
200 201 202 |
# File 'lib/twirl/cluster.rb', line 200 def quit multi_client_op_with_result :quit end |
#reload ⇒ Object
Public: Reload the config of each client’s server.
Returns Hash of hosts and results.
186 187 188 |
# File 'lib/twirl/cluster.rb', line 186 def reload multi_client_op_with_result :reload end |
#reserve(queue_name, options = {}) ⇒ Object
Public: Reserve the next item on the queue.
This is a helper method to get an item from a queue and open it for reliable read.
queue_name - The String name of the queue. options - Additional options.
See KJess::Client#get for all options.
Returns a Twirl::Item if an item was found, otherwise nil.
125 126 127 |
# File 'lib/twirl/cluster.rb', line 125 def reserve(queue_name, = {}) client_read_op client, :reserve, queue_name, end |
#rotate ⇒ Object
Private: Ensures that clients will be rotated by changing the client index and resetting the command count.
228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/twirl/cluster.rb', line 228 def rotate @instrumenter.instrument "op.twirl", { op: :rotate, metric_type: :counter, command_count: @command_count, commands_per_client: @commands_per_client, } @command_count = 0 @client_index = (@client_index + 1) % @clients.size end |
#rotate_for_next_op ⇒ Object
Private: Makes it so the client will rotate for the next operation.
241 242 243 |
# File 'lib/twirl/cluster.rb', line 241 def rotate_for_next_op @command_count = @commands_per_client end |
#set(queue_name, item, expiration = 0) ⇒ Object
Public: Add an item to the given queue.
queue_name - The String name of the queue. item - The String item to add to the queue. expiration - The Number of seconds from now to expire the item (default: 0).
Returns true if successful, false otherwise.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/twirl/cluster.rb', line 78 def set(queue_name, item, expiration = 0) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = :set payload[:bytes] = item.to_s.size payload[:queue_name] = queue_name payload[:retry] = tries != @retries value = if item @encoder.dump(item) else nil end client.set(queue_name, value, expiration) } } end |
#shutdown ⇒ Object
Public: Tells each client to shutdown their server.
Returns nothing.
214 215 216 |
# File 'lib/twirl/cluster.rb', line 214 def shutdown multi_client_op :shutdown end |
#stats ⇒ Object
Public: Return stats for each client’s server.
Returns a Hash of stats for each host.
193 194 195 |
# File 'lib/twirl/cluster.rb', line 193 def stats multi_client_op_with_result :stats end |
#version ⇒ Object
Public: Return the version of each server.
Returns a Hash of hosts and results.
166 167 168 169 170 171 172 173 174 |
# File 'lib/twirl/cluster.rb', line 166 def version multi_client_op_with_result :version do |client| begin client.version rescue KJess::ProtocolError "unavailable" end end end |
#with_retries ⇒ Object
Private: Retries an operation a number of times if it raises exception.
320 321 322 323 324 325 326 327 328 |
# File 'lib/twirl/cluster.rb', line 320 def with_retries tries = @retries begin yield tries rescue *@retryable_errors tries -= 1 tries > 0 ? retry : raise end end |