Class: Couchbase::Bucket

Inherits:
Object
  • Object
show all
Includes:
Async, Operations
Defined in:
lib/couchbase/bucket.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  type:                      nil,
  quiet:                     false,
  hostname:                  'localhost',
  port:                      8091,
  pool:                      'default',
  bucket:                    'default',
  password:                  '',
  engine:                    nil,
  default_ttl:               0,
  async:                     false,
  default_arithmetic_init:   0,
  default_flags:             0,
  default_format:            :document,
  default_observe_timeout:   2_500_000,
  on_error:                  nil,
  on_connect:                nil,
  timeout:                   0,
  environment:               nil,
  key_prefix:                nil,
  node_list:                 nil,
  destroying:                0,
  connected:                 0,
  on_connect_proc:           nil,
  async_disconnect_hook_set: 0,
  connected:                 false
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Async

#async, #async=, #async?, #async_queue, #end_async_queue, #run, #run_async, #running, #running=, #running?

Methods included from Operations

included

Constructor Details

#initialize(url, options = {}) ⇒ Bucket #initialize(options = {}) ⇒ Bucket

Initialize new Bucket.

Examples:

Initialize connection using default options

Couchbase.new

Select custom bucket

Couchbase.new(:bucket => 'foo')
Couchbase.new('http://localhost:8091/pools/default/buckets/foo')

Connect to protected bucket

Couchbase.new(:bucket => 'protected', :username => 'protected', :password => 'secret')
Couchbase.new('http://localhost:8091/pools/default/buckets/protected',
              :username => 'protected', :password => 'secret')

Use list of nodes, in case some nodes might be dead

Couchbase.new(:node_list => ['example.com:8091', 'example.org:8091', 'example.net'])

Overloads:

  • #initialize(url, options = {}) ⇒ Bucket

    Initialize bucket using URI of the cluster and options. It is possible to override some parts of URI using the options keys (e.g. :host or :port)

    Parameters:

    • url (String)

      The full URL of management API of the cluster.

    • options (Hash) (defaults to: {})

      The options for connection. See options definition below.

  • #initialize(options = {}) ⇒ Bucket

    Initialize bucket using options only.

    Parameters:

    • options (Hash) (defaults to: {})

      The options for operation for connection

    Options Hash (options):

    • :node_list (Array) — default: nil

      the list of nodes to connect to. If specified it takes precedence over :host option. The list must be array of strings in form of host names or host names with ports (in first case port 8091 will be used, see examples).

    • :host (String) — default: "localhost"

      the hostname or IP address of the node

    • :port (Fixnum) — default: 8091

      the port of the managemenent API

    • :pool (String) — default: "default"

      the pool name

    • :bucket (String) — default: "default"

      the bucket name

    • :default_ttl (Fixnum) — default: 0

      the TTL used by default during storing key-value pairs.

    • :default_flags (Fixnum) — default: 0

      the default flags.

    • :default_format (Symbol) — default: :document

      the format, which will be used for values by default. Note that changing format will amend flags. (see #default_format)

    • :username (String) — default: nil

      the user name to connect to the cluster. Used to authenticate on management API. The username could be skipped for protected buckets, the bucket name will be used instead.

    • :password (String) — default: nil

      the password of the user.

    • :quiet (true, false) — default: false

      the flag controlling if raising exception when the client executes operations on non-existent keys. If it is true it will raise Error::NotFound exceptions. The default behaviour is to return nil value silently (might be useful in Rails cache).

    • :environment (Symbol) — default: :production

      the mode of the connection. Currently it influences only on design documents set. If the environment is :development, you will able to get design documents with ‘dev_’ prefix, otherwise (in :production mode) the library will hide them from you.

    • :key_prefix (String) — default: nil

      the prefix string which will be prepended to each key before sending out, and sripped before returning back to the application.

    • :timeout (Fixnum) — default: 2500000

      the timeout for IO operations (in microseconds)

    • :default_arithmetic_init (Fixnum, true) — default: 0

      the default initial value for arithmetic operations. Setting this option to any non positive number forces creation missing keys with given default value. Setting it to true will use zero as initial value. (see Bucket#incr and Bucket#decr).

    • :engine (Symbol) — default: :default

      the IO engine to use Currently following engines are supported:

      :default

      Built-in engine (multi-thread friendly)

      :libevent

      libevent IO plugin from libcouchbase (optional)

      :libev

      libev IO plugin from libcouchbase (optional)

      :eventmachine

      EventMachine plugin (builtin, but requires EM gem and ruby 1.9+)

    • :async (true, false) — default: false

      If true, the connection instance will be considered always asynchronous and IO interaction will be occured only when #run called. See #on_connect to hook your code after the instance will be connected.

Raises:

  • (Couchbase::Error::BucketNotFound)

    if there is no such bucket to connect to

  • (Couchbase::Error::Connect)

    if the socket wasn’t accessible (doesn’t accept connections or doesn’t respond in time)

Since:

  • 1.0.0



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/couchbase/bucket.rb', line 164

def initialize(url = nil, options = {})
  url_options = if url.is_a? String
                  fail ArgumentError.new unless url =~ /^http:\/\//
                  uri = URI.new(url)
                  { hostname: uri.host, port: uri.port }.
                    merge(path_to_pool_and_bucket(uri.path))
                elsif url.nil?
                  {}
                else
                  url
                end

  options = Couchbase.normalize_connection_options(options)

  connection_options = DEFAULT_OPTIONS.merge(url_options).merge(options)

  connection_options.each_pair do |key, value|
    instance_variable_set("@#{key}", value)
  end

  @transcoders = {
    document: Transcoder::Document.new,
    marshal:  Transcoder::Marshal.new,
    plain:    Transcoder::Plain.new
  }

  connect unless async?
end

Instance Attribute Details

#bucketObject

Returns the value of attribute bucket.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def bucket
  @bucket
end

#clientObject (readonly)

Returns the value of attribute client.



42
43
44
# File 'lib/couchbase/bucket.rb', line 42

def client
  @client
end

#default_arithmetic_initObject

Returns the value of attribute default_arithmetic_init.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def default_arithmetic_init
  @default_arithmetic_init
end

#default_formatObject

Returns the value of attribute default_format.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def default_format
  @default_format
end

#default_ttlObject

Returns the value of attribute default_ttl.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def default_ttl
  @default_ttl
end

#hostnameObject

Returns the value of attribute hostname.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def hostname
  @hostname
end

#key_prefixObject (readonly)

Returns the value of attribute key_prefix.



42
43
44
# File 'lib/couchbase/bucket.rb', line 42

def key_prefix
  @key_prefix
end

#passwordObject

Returns the value of attribute password.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def password
  @password
end

#poolObject

Returns the value of attribute pool.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def pool
  @pool
end

#portObject

Returns the value of attribute port.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def port
  @port
end

#quietObject

Returns the value of attribute quiet.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def quiet
  @quiet
end

#timeoutObject

Returns the value of attribute timeout.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def timeout
  @timeout
end

#transcoderObject

Returns the value of attribute transcoder.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def transcoder
  @transcoder
end

#usernameObject

Returns the value of attribute username.



38
39
40
# File 'lib/couchbase/bucket.rb', line 38

def username
  @username
end

Instance Method Details

#authorityObject



224
225
226
# File 'lib/couchbase/bucket.rb', line 224

def authority
  "#{hostname}:#{port}"
end

#base_urlObject



228
229
230
# File 'lib/couchbase/bucket.rb', line 228

def base_url
  "http://#{authority}/pools"
end

#cas(key, options = {}) {|value| ... } ⇒ Fixnum Also known as: compare_and_swap

Compare and swap value.

Reads a key’s value from the server and yields it to a block. Replaces the key’s value with the result of the block as long as the key hasn’t been updated in the meantime, otherwise raises Error::KeyExists. CAS stands for “compare and swap”, and avoids the need for manual key mutexing. Read more info here:

In asynchronous mode it will yield result twice, first for Bucket#get with Result#operation equal to :get and second time for Bucket#set with Result#operation equal to :set.

Examples:

Implement append to JSON encoded value


c.default_format = :document
c.set("foo", {"bar" => 1})
c.cas("foo") do |val|
  val["baz"] = 2
  val
end
c.get("foo")      #=> {"bar" => 1, "baz" => 2}

Append JSON encoded value asynchronously


c.default_format = :document
c.set("foo", {"bar" => 1})
c.run do
  c.cas("foo") do |val|
    case val.operation
    when :get
      val["baz"] = 2
      val
    when :set
      # verify all is ok
      puts "error: #{ret.error.inspect}" unless ret.success?
    end
  end
end
c.get("foo")      #=> {"bar" => 1, "baz" => 2}

Parameters:

  • key (String, Symbol)
  • options (Hash) (defaults to: {})

    the options for “swap” part

Options Hash (options):

  • :ttl (Fixnum) — default: self.default_ttl

    the time to live of this key

  • :format (Symbol) — default: self.default_format

    format of the value

  • :flags (Fixnum) — default: self.default_flags

    flags for this key

Yield Parameters:

  • value (Object, Result)

    old value in synchronous mode and Result object in asynchronous mode.

Yield Returns:

  • (Object)

    new value.

Returns:

  • (Fixnum)

    the CAS of new value

Raises:

  • (Couchbase::Error::KeyExists)

    if the key was updated before the the code in block has been completed (the CAS value has been changed).

  • (ArgumentError)

    if the block is missing for async mode

See Also:

Since:

  • 1.0.0



331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/couchbase/bucket.rb', line 331

def cas(key, options = {})
  if async?
    block = Proc.new
    get(key) do |ret|
      val = block.call(ret) # get new value from caller
      set(ret.key, val, options.merge(:cas => ret.cas, &block))
    end
  else
    val, flags, ver = get(key, :extended => true)
    val = yield(val) # get new value from caller
    set(key, val, options.merge(:cas => ver))
  end
end

#connectObject Also known as: reconnect



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/couchbase/bucket.rb', line 201

def connect
  uris = if @node_list
           Array(@node_list).map { |n| URI.new(n) }
         else
           Array(URI.new(base_url))
         end

  begin
    builder = CouchbaseConnectionFactoryBuilder.new
    builder.setTranscoder(transcoder)
    connection_factory = builder.buildCouchbaseConnection(uris, bucket.to_java_string, password.to_java_string)
    @client = CouchbaseClient.new(connection_factory)
    @connected = true
  rescue Java::ComCouchbaseClientVbucket::ConfigurationException
    fail Couchbase::Error::Auth, 'Couchbase configurations are incorrect.'
  rescue java.net.ConnectException => e
    fail Couchbase::Error::Connect
  end

  self
end

#connected?Boolean

Returns:

  • (Boolean)


236
237
238
# File 'lib/couchbase/bucket.rb', line 236

def connected?
  @connected
end

#create_periodic_timer(interval, &block) ⇒ Couchbase::Timer

Create and register periodic timer

Returns:

  • (Couchbase::Timer)


397
398
399
# File 'lib/couchbase/bucket.rb', line 397

def create_periodic_timer(interval, &block)
  Timer.new(self, interval, :periodic => true, &block)
end

#create_timer(interval, &block) ⇒ Couchbase::Timer

Create and register one-shot timer

Returns:

  • (Couchbase::Timer)


390
391
392
# File 'lib/couchbase/bucket.rb', line 390

def create_timer(interval, &block)
  Timer.new(self, interval, &block)
end

#disconnectObject



240
241
242
243
244
245
246
247
248
249
# File 'lib/couchbase/bucket.rb', line 240

def disconnect
  if connected?
    @client.shutdown(3, TimeUnit::SECONDS)
    @client = nil
    @connection_factory = nil
    @connected = false
  else
    fail Couchbase::Error::Connect
  end
end

#flush {|ret| ... } ⇒ true

Delete contents of the bucket

Examples:

Simple flush the bucket

c.flush    #=> true

Asynchronous flush

c.run do
  c.flush do |ret|
    ret.operation   #=> :flush
    ret.success?    #=> true
    ret.status      #=> 200
  end
end

Yield Parameters:

  • ret (Result)

    the object with error, status and operation attributes.

Returns:

  • (true)

    always return true (see raise section)

Raises:

  • (Couchbase::Error::Protocol)

    in case of an error is encountered. Check Error::Base#status for detailed code.

See Also:

Since:

  • 1.2.0.beta



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/couchbase/bucket.rb', line 371

def flush
  if !async? && block_given?
    sync_block_error
  end
  req = make_http_request("/pools/default/buckets/#{bucket}/controller/doFlush",
                          :type => :management, :method => :post, :extended => true)
  res = nil
  req.on_body do |r|
    res = r
    res.instance_variable_set("@operation", :flush)
    yield(res) if block_given?
  end
  req.continue
  true
end

#hostObject



197
198
199
# File 'lib/couchbase/bucket.rb', line 197

def host
  hostname
end

#observe_and_wait(*keys, &block) ⇒ Fixnum, Hash<String, Fixnum>

Wait for persistence condition

This operation is useful when some confidence needed regarding the state of the keys. With two parameters :replicated and :persisted it allows to set up the waiting rule.

Parameters:

  • keys (String, Symbol, Array, Hash)

    The list of the keys to observe. Full form is hash with key-cas value pairs, but there are also shortcuts like just Array of keys or single key. CAS value needed to when you need to ensure that the storage persisted exactly the same version of the key you are asking to observe.

  • options (Hash)

    The options for operation

Returns:

  • (Fixnum, Hash<String, Fixnum>)

    will return CAS value just like mutators or pairs key-cas in case of multiple keys.

Raises:

  • (Couchbase::Error::Timeout)

    if the given time is up

Since:

  • 1.2.0.dp6



425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# File 'lib/couchbase/bucket.rb', line 425

def observe_and_wait(*keys, &block)
  options = {:timeout => default_observe_timeout}
  options.update(keys.pop) if keys.size > 1 && keys.last.is_a?(Hash)
  verify_observe_options(options)
  if block && !async?
    raise ArgumentError, "synchronous mode doesn't support callbacks"
  end
  if keys.size == 0
    raise ArgumentError, "at least one key is required"
  end
  if keys.size == 1 && keys[0].is_a?(Hash)
    key_cas = keys[0]
  else
    key_cas = keys.flatten.reduce({}) do |h, kk|
      h[kk] = nil   # set CAS to nil
      h
    end
  end
  if async?
    do_observe_and_wait(key_cas, options, &block)
  else
    res = do_observe_and_wait(key_cas, options, &block) while res.nil?
    unless async?
      if keys.size == 1 && (keys[0].is_a?(String) || keys[0].is_a?(Symbol))
        return res.values.first
      else
        return res
      end
    end
  end
end

#on_connect(&block) ⇒ Object



255
256
257
# File 'lib/couchbase/bucket.rb', line 255

def on_connect(&block)
  @on_connect = block
end

#on_error(&block) ⇒ Object



259
260
261
# File 'lib/couchbase/bucket.rb', line 259

def on_error(&block)
  @on_error = block
end

#quiet?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/couchbase/bucket.rb', line 193

def quiet?
  !!quiet
end

#urlObject



232
233
234
# File 'lib/couchbase/bucket.rb', line 232

def url
  "http://#{authority}/pools/#{pool}/buckets/#{bucket}/"
end

#versionObject



263
264
265
266
267
268
269
# File 'lib/couchbase/bucket.rb', line 263

def version
  {}.tap do |hash|
    @client.getVersions.to_hash.each_pair do |ip, ver|
      hash[ip.to_s] = ver
    end
  end
end