Class: Twirl::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/twirl/cluster.rb

Constant Summary collapse

RetryableErrors =

Private: The default Array of errors to retry.

[
  KJess::NetworkError,
  # this inherits from protocol error, but seems like it should be retried
  KJess::ServerError,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(clients, options = {}) ⇒ Cluster

Public: Initialize a new cluster.

clients - An array of KJess::Client instances with port (localhost:1234) options - A Hash of options.

:commands_per_client - The Number of commands to run per client
                       before rotating to the next client (default: 100)
:retries - The Number of times a command should be retried (default: 5).
:instrumenter - Where to send instrumention (defaults: noop).
:encoder - What to use to dump/load vlues (defaults: mirror).


55
56
57
58
59
60
61
62
63
64
# File 'lib/twirl/cluster.rb', line 55

def initialize(clients, options = {})
  @client_index = 0
  @command_count = 0
  @clients = clients.shuffle
  @retries = options.fetch(:retries, 5)
  @commands_per_client = options.fetch(:commands_per_client, 100)
  @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)
  @encoder = options.fetch(:encoder, Mirror)
  @retryable_errors = options.fetch(:retryable_errors, RetryableErrors)
end

Instance Attribute Details

#client_indexObject (readonly)

Private: What is the array index of the client being used currently.



20
21
22
# File 'lib/twirl/cluster.rb', line 20

def client_index
  @client_index
end

#command_countObject (readonly)

Private: The number of commands issued to the current client.



23
24
25
# File 'lib/twirl/cluster.rb', line 23

def command_count
  @command_count
end

#commands_per_clientObject (readonly)

Private: The number of commands to issue to a client before rotating.



29
30
31
# File 'lib/twirl/cluster.rb', line 29

def commands_per_client
  @commands_per_client
end

#encoderObject (readonly)

Private: What handles dumping and loading values.



35
36
37
# File 'lib/twirl/cluster.rb', line 35

def encoder
  @encoder
end

#instrumenterObject (readonly)

Private: What should be used to instrument all the things.



32
33
34
# File 'lib/twirl/cluster.rb', line 32

def instrumenter
  @instrumenter
end

#retriesObject (readonly)

Private: The number of times to retry retryable errors.



26
27
28
# File 'lib/twirl/cluster.rb', line 26

def retries
  @retries
end

#retryable_errorsObject (readonly)

Private: What errors should be considered retryable.



38
39
40
# File 'lib/twirl/cluster.rb', line 38

def retryable_errors
  @retryable_errors
end

Instance Method Details

#clientObject

Private: Returns the client to be used to issue a command.



219
220
221
222
223
224
# File 'lib/twirl/cluster.rb', line 219

def client
  rotate if @command_count >= @commands_per_client

  @command_count += 1
  @clients[@client_index]
end

#client_read_op(client, op, queue_name, *args) ⇒ Object

Private: Perform an operation for a given client. Rotates clients if nil item is result of op.

Returns a Twirl::Item if an item was found, otherwise nil.



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/twirl/cluster.rb', line 249

def client_read_op(client, op, queue_name, *args)
  with_retries { |tries|
    @instrumenter.instrument("op.twirl") { |payload|
      payload[:op] = op
      payload[:queue_name] = queue_name
      payload[:retry] = tries != @retries

      if value = client.send(op, queue_name, *args)
        payload[:bytes] = value.size
        value = @encoder.load(value) if value
        Item.new queue_name, value, client, @instrumenter
      else
        rotate_for_next_op
        nil
      end
    }
  }
end

#delete(queue_name) ⇒ Object

Public : Remove a queue.

queue_name - The String name of the queue.

Returns a Hash of hosts and results.



143
144
145
# File 'lib/twirl/cluster.rb', line 143

def delete(queue_name)
  multi_client_queue_op_with_result :delete, queue_name
end

#disconnectObject

Public: Disconnect from each client’s server.

Returns nothing.



207
208
209
# File 'lib/twirl/cluster.rb', line 207

def disconnect
  multi_client_op :disconnect
end

#each(&block) ⇒ Object

Public: Iterate through the clients.



67
68
69
# File 'lib/twirl/cluster.rb', line 67

def each(&block)
  @clients.each { |client| yield client }
end

#flush(queue_name) ⇒ Object

Public: Remove all items from a queue.

queue_name - The String name of the queue.

Returns a Hash of hosts and results.



152
153
154
# File 'lib/twirl/cluster.rb', line 152

def flush(queue_name)
  multi_client_queue_op_with_result :flush, queue_name
end

#flush_allObject

Public: Remove all items from all queues.

Returns a Hash of hosts and results.



159
160
161
# File 'lib/twirl/cluster.rb', line 159

def flush_all
  multi_client_op_with_result :flush_all
end

#get(queue_name, options = {}) ⇒ Object

Public: Retrieve an item from the given queue.

It is possible to send both :open and :close in the same get operation, but I would not recommend it. You will end up in a situation where the client will rotate and the :close then goes to the wrong client.

We could do two get operations if you pass both options, send the :close to the current client and send the :open as a second operation to the rotated client, but that seems sneaky.

queue_name - The String name of the queue. options - The Hash of options for retrieving an item.

See KJess::Client#get for all options.

Returns a Twirl::Item if an item was found, otherwise nil.



111
112
113
# File 'lib/twirl/cluster.rb', line 111

def get(queue_name, options = {})
  client_read_op client, :get, queue_name, options
end

#multi_client_op(op, *args, &block) ⇒ Object

Private: Perform an op on all the clients.



269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/twirl/cluster.rb', line 269

def multi_client_op(op, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op

    @clients.each do |client|
      if block_given?
        yield client
      else
        client.send(op, *args)
      end
    end
  }
end

#multi_client_op_with_result(op, *args, &block) ⇒ Object

Private: Perform an op on all clients.

Returns a Hash of the servers as keys and the results as values.



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/twirl/cluster.rb', line 303

def multi_client_op_with_result(op, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op

    result = {}
    @clients.each { |client|
      result["#{client.host}:#{client.port}"] = if block_given?
        yield client
      else
        client.send(op, *args)
      end
    }
    result
  }
end

#multi_client_queue_op_with_result(op, queue_name, *args, &block) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/twirl/cluster.rb', line 283

def multi_client_queue_op_with_result(op, queue_name, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op
    payload[:queue_name] = queue_name

    result = {}
    @clients.each { |client|
      result["#{client.host}:#{client.port}"] = if block_given?
        yield client
      else
        client.send(op, queue_name, *args)
      end
    }
    result
  }
end

#peek(queue_name) ⇒ Object

Public: Peek at the top item in the queue.

queue_name - The String name of the queue.

Returns a Twirl::Item if an item was found, otherwise nil.



134
135
136
# File 'lib/twirl/cluster.rb', line 134

def peek(queue_name)
  client_read_op client, :peek, queue_name
end

#pingObject

Public: Which clients can actually reach their server.

Returns Hash of hosts and results.



179
180
181
# File 'lib/twirl/cluster.rb', line 179

def ping
  multi_client_op_with_result :ping
end

#quitObject

Public: Disconnect from each client’s server.

Returns Hash of hosts and results.



200
201
202
# File 'lib/twirl/cluster.rb', line 200

def quit
  multi_client_op_with_result :quit
end

#reloadObject

Public: Reload the config of each client’s server.

Returns Hash of hosts and results.



186
187
188
# File 'lib/twirl/cluster.rb', line 186

def reload
  multi_client_op_with_result :reload
end

#reserve(queue_name, options = {}) ⇒ Object

Public: Reserve the next item on the queue.

This is a helper method to get an item from a queue and open it for reliable read.

queue_name - The String name of the queue. options - Additional options.

See KJess::Client#get for all options.

Returns a Twirl::Item if an item was found, otherwise nil.



125
126
127
# File 'lib/twirl/cluster.rb', line 125

def reserve(queue_name, options = {})
  client_read_op client, :reserve, queue_name, options
end

#rotateObject

Private: Ensures that clients will be rotated by changing the client index and resetting the command count.



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/twirl/cluster.rb', line 228

def rotate
  @instrumenter.instrument "op.twirl", {
    op: :rotate,
    metric_type: :counter,
    command_count: @command_count,
    commands_per_client: @commands_per_client,
  }

  @command_count = 0
  @client_index = (@client_index + 1) % @clients.size
end

#rotate_for_next_opObject

Private: Makes it so the client will rotate for the next operation.



241
242
243
# File 'lib/twirl/cluster.rb', line 241

def rotate_for_next_op
  @command_count = @commands_per_client
end

#set(queue_name, item, expiration = 0) ⇒ Object

Public: Add an item to the given queue.

queue_name - The String name of the queue. item - The String item to add to the queue. expiration - The Number of seconds from now to expire the item (default: 0).

Returns true if successful, false otherwise.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/twirl/cluster.rb', line 78

def set(queue_name, item, expiration = 0)
  with_retries { |tries|
    @instrumenter.instrument("op.twirl") { |payload|
      payload[:op] = :set
      payload[:bytes] = item.to_s.size
      payload[:queue_name] = queue_name
      payload[:retry] = tries != @retries

      value = if item
        @encoder.dump(item)
      else
        nil
      end
      client.set(queue_name, value, expiration)
    }
  }
end

#shutdownObject

Public: Tells each client to shutdown their server.

Returns nothing.



214
215
216
# File 'lib/twirl/cluster.rb', line 214

def shutdown
  multi_client_op :shutdown
end

#statsObject

Public: Return stats for each client’s server.

Returns a Hash of stats for each host.



193
194
195
# File 'lib/twirl/cluster.rb', line 193

def stats
  multi_client_op_with_result :stats
end

#versionObject

Public: Return the version of each server.

Returns a Hash of hosts and results.



166
167
168
169
170
171
172
173
174
# File 'lib/twirl/cluster.rb', line 166

def version
  multi_client_op_with_result :version do |client|
    begin
      client.version
    rescue KJess::ProtocolError
      "unavailable"
    end
  end
end

#with_retriesObject

Private: Retries an operation a number of times if it raises exception.



320
321
322
323
324
325
326
327
328
# File 'lib/twirl/cluster.rb', line 320

def with_retries
  tries = @retries
  begin
    yield tries
  rescue *@retryable_errors
    tries -= 1
    tries > 0 ? retry : raise
  end
end