Class: Kestrel::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
StatsHelper
Defined in:
lib/kestrel/client.rb,
lib/kestrel/client/json.rb,
lib/kestrel/client/proxy.rb,
lib/kestrel/client/blocking.rb,
lib/kestrel/client/envelope.rb,
lib/kestrel/client/namespace.rb,
lib/kestrel/client/unmarshal.rb,
lib/kestrel/client/partitioning.rb

Defined Under Namespace

Modules: StatsHelper Classes: Blocking, Envelope, Json, Namespace, Partitioning, Proxy, Transactional, Unmarshal

Constant Summary collapse

KESTREL_OPTIONS =
[:gets_per_server, :exception_retry_limit, :get_timeout_ms].freeze
DEFAULT_OPTIONS =
{
  :retry_timeout => 0,
  :exception_retry_limit => 5,
  :timeout => 0.25,
  :gets_per_server => 100,
  :get_timeout_ms => 10
}.freeze
RECOVERABLE_ERRORS =

Exceptions which are connection failures we retry after

[
  Memcached::ServerIsMarkedDead,
  Memcached::ATimeoutOccurred,
  Memcached::ConnectionBindFailure,
  Memcached::ConnectionFailure,
  Memcached::ConnectionSocketCreateFailure,
  Memcached::Failure,
  Memcached::MemoryAllocationFailure,
  Memcached::ReadFailure,
  Memcached::ServerError,
  Memcached::SystemError,
  Memcached::UnknownReadFailure,
  Memcached::WriteFailure,
  Memcached::NotFound
]

Constants included from StatsHelper

StatsHelper::QUEUE_STAT_NAMES, StatsHelper::STATS_TIMEOUT

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from StatsHelper

#available_queues, #sizeof, #stat, #stats

Constructor Details

#initialize(*servers) ⇒ Client

Returns a new instance of Client.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kestrel/client.rb', line 51

def initialize(*servers)
  opts = servers.last.is_a?(Hash) ? servers.pop : {}
  opts = DEFAULT_OPTIONS.merge(opts)

  @kestrel_options = extract_kestrel_options!(opts)
  @default_get_timeout = kestrel_options[:get_timeout_ms]
  @gets_per_server = kestrel_options[:gets_per_server]
  @exception_retry_limit = kestrel_options[:exception_retry_limit]
  @counter = 0
  @shuffle = true

  # we handle our own retries so that we can apply different
  # policies to sets and gets, so set memcached limit to 0
  opts[:exception_retry_limit] = 0
  opts[:distribution] = :random # force random distribution

  self.servers  = Array(servers).flatten.compact
  self.options  = opts

  @server_count = self.servers.size # Minor optimization.
  @read_client  = Memcached.new(self.servers[rand(@server_count)], opts)
  @write_client = Memcached.new(self.servers[rand(@server_count)], opts)
end

Instance Attribute Details

#current_queueObject (readonly)

Returns the value of attribute current_queue.



47
48
49
# File 'lib/kestrel/client.rb', line 47

def current_queue
  @current_queue
end

#current_serverObject (readonly)

Returns the value of attribute current_server.



47
48
49
# File 'lib/kestrel/client.rb', line 47

def current_server
  @current_server
end

#kestrel_optionsObject (readonly)

Returns the value of attribute kestrel_options.



47
48
49
# File 'lib/kestrel/client.rb', line 47

def kestrel_options
  @kestrel_options
end

#optionsObject

Returns the value of attribute options.



46
47
48
# File 'lib/kestrel/client.rb', line 46

def options
  @options
end

#serversObject

Returns the value of attribute servers.



46
47
48
# File 'lib/kestrel/client.rb', line 46

def servers
  @servers
end

Instance Method Details

#delete(key, expiry = 0) ⇒ Object



75
76
77
78
# File 'lib/kestrel/client.rb', line 75

def delete(key, expiry=0)
  with_retries { @write_client.delete key }
rescue Memcached::NotFound, Memcached::ServerEnd
end

#flush(queue) ⇒ Object



137
138
139
140
141
142
143
# File 'lib/kestrel/client.rb', line 137

def flush(queue)
  count = 0
  while sizeof(queue) > 0
    count += 1 while get queue, :raw => true
  end
  count
end

#get(key, opts = {}) ⇒ Object

Parameters

key<String>

Queue name

opts<Boolean,Hash>

True/false toggles Marshalling. A Hash allows collision-avoiding options support.

Options (opts)

:open<Boolean>

Begins a transactional read.

:close<Boolean>

Ends a transactional read.

:abort<Boolean>

Cancels an existing transactional read

:peek<Boolean>

Return the head of the queue, without removal

:timeout<Integer>

Milliseconds to block for a new item

:raw<Boolean>

Toggles Marshalling. Equivalent to the “old style” second argument.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/kestrel/client.rb', line 113

def get(key, opts = {})
  raw = opts[:raw] || false
  commands = extract_queue_commands(opts)

  val =
    begin
      shuffle_if_necessary! key
      @read_client.get key + commands, !raw
    rescue *RECOVERABLE_ERRORS
      # we can't tell the difference between a server being down
      # and an empty queue, so just return nil. our sticky server
      # logic should eliminate piling on down servers
      nil
    end

  # nil result without :close and :abort, force next get to jump from
  # current server
  if !val && @shuffle && !opts[:close] && !opts[:abort]
    @counter = @gets_per_server
  end

  val
end

#get_from_last(*args) ⇒ Object

This provides the necessary semantic to support transactionality in the Transactional client. It temporarily disables server shuffling to allow the client to close any open transactions on the current server before jumping.



92
93
94
95
96
97
# File 'lib/kestrel/client.rb', line 92

def get_from_last(*args)
  @shuffle = false
  get *args
ensure
  @shuffle = true
end

#peek(queue) ⇒ Object



145
146
147
# File 'lib/kestrel/client.rb', line 145

def peek(queue)
  get queue, :peek => true
end

#set(key, value, ttl = 0, raw = false) ⇒ Object



80
81
82
83
84
85
# File 'lib/kestrel/client.rb', line 80

def set(key, value, ttl=0, raw=false)
  with_retries { @write_client.set key, value, ttl, !raw }
  true
rescue Memcached::NotStored
  false
end