Class: Redis::Distributed

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/distributed.rb

Defined Under Namespace

Classes: CannotDistribute

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_configs, options = {}) ⇒ Distributed

Returns a new instance of Distributed.



20
21
22
23
24
25
26
27
28
# File 'lib/redis/distributed.rb', line 20

def initialize(node_configs, options = {})
  @tag = options[:tag] || /^\{(.+?)\}/
  @ring = options[:ring] || HashRing.new
  @node_configs = node_configs.map(&:dup)
  @default_options = options.dup
  node_configs.each { |node_config| add_node(node_config) }
  @subscribed_node = nil
  @watch_key = nil
end

Instance Attribute Details

#ringObject (readonly)

Returns the value of attribute ring.



18
19
20
# File 'lib/redis/distributed.rb', line 18

def ring
  @ring
end

Instance Method Details

#[](key) ⇒ Object



410
411
412
# File 'lib/redis/distributed.rb', line 410

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



414
415
416
# File 'lib/redis/distributed.rb', line 414

def []=(key, value)
  set(key, value)
end

#_bpop(cmd, args) ⇒ Object



476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'lib/redis/distributed.rb', line 476

def _bpop(cmd, args)
  timeout = if args.last.is_a?(Hash)
    options = args.pop
    options[:timeout]
  end

  args.flatten!(1)

  ensure_same_node(cmd, args) do |node|
    if timeout
      node.__send__(cmd, args, timeout: timeout)
    else
      node.__send__(cmd, args)
    end
  end
end

#_eval(cmd, args) ⇒ Object



1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
# File 'lib/redis/distributed.rb', line 1042

def _eval(cmd, args)
  script = args.shift
  options = args.pop if args.last.is_a?(Hash)
  options ||= {}

  keys = args.shift || options[:keys] || []
  argv = args.shift || options[:argv] || []

  ensure_same_node(cmd, keys) do |node|
    node.send(cmd, script, keys, argv)
  end
end

#add_node(options) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/redis/distributed.rb', line 41

def add_node(options)
  options = { url: options } if options.is_a?(String)
  options = @default_options.merge(options)
  options.delete(:tag)
  options.delete(:ring)
  @ring.add_node Redis.new(options)
end

#append(key, value) ⇒ Object

Append a value to a key.



378
379
380
# File 'lib/redis/distributed.rb', line 378

def append(key, value)
  node_for(key).append(key, value)
end

#bgsaveObject

Asynchronously save the dataset to disk.



74
75
76
# File 'lib/redis/distributed.rb', line 74

def bgsave
  on_each_node :bgsave
end

#bitcount(key, start = 0, stop = -1,, scale: nil) ⇒ Object

Count the number of set bits in a range of the string value stored at key.



383
384
385
# File 'lib/redis/distributed.rb', line 383

def bitcount(key, start = 0, stop = -1, scale: nil)
  node_for(key).bitcount(key, start, stop, scale: scale)
end

#bitop(operation, destkey, *keys) ⇒ Object

Perform a bitwise operation between strings and store the resulting string in a key.



388
389
390
391
392
393
# File 'lib/redis/distributed.rb', line 388

def bitop(operation, destkey, *keys)
  keys.flatten!(1)
  ensure_same_node(:bitop, [destkey] + keys) do |node|
    node.bitop(operation, destkey, keys)
  end
end

#bitpos(key, bit, start = nil, stop = nil, scale: nil) ⇒ Object

Return the position of the first bit set to 1 or 0 in a string.



396
397
398
# File 'lib/redis/distributed.rb', line 396

def bitpos(key, bit, start = nil, stop = nil, scale: nil)
  node_for(key).bitpos(key, bit, start, stop, scale: scale)
end

#blmove(source, destination, where_source, where_destination, timeout: 0) ⇒ Object

Remove the first/last element in a list and append/prepend it to another list and return it, or block until one is available.



432
433
434
435
436
# File 'lib/redis/distributed.rb', line 432

def blmove(source, destination, where_source, where_destination, timeout: 0)
  ensure_same_node(:lmove, [source, destination]) do |node|
    node.blmove(source, destination, where_source, where_destination, timeout: timeout)
  end
end

#blmpop(timeout, *keys, modifier: "LEFT", count: nil) ⇒ Object

Iterate over keys, blocking and removing elements from the first non empty liist found.



556
557
558
559
560
# File 'lib/redis/distributed.rb', line 556

def blmpop(timeout, *keys, modifier: "LEFT", count: nil)
  ensure_same_node(:blmpop, keys) do |node|
    node.blmpop(timeout, *keys, modifier: modifier, count: count)
  end
end

#blpop(*args) ⇒ Object

Remove and get the first element in a list, or block until one is available.



495
496
497
# File 'lib/redis/distributed.rb', line 495

def blpop(*args)
  _bpop(:blpop, args)
end

#brpop(*args) ⇒ Object

Remove and get the last element in a list, or block until one is available.



513
514
515
# File 'lib/redis/distributed.rb', line 513

def brpop(*args)
  _bpop(:brpop, args)
end

#brpoplpush(source, destination, **options) ⇒ Object

Pop a value from a list, push it to another list and return it; or block until one is available.



519
520
521
522
523
# File 'lib/redis/distributed.rb', line 519

def brpoplpush(source, destination, **options)
  ensure_same_node(:brpoplpush, [source, destination]) do |node|
    node.brpoplpush(source, destination, **options)
  end
end

#bzmpop(timeout, *keys, modifier: "MIN", count: nil) ⇒ Object

Iterate over keys, blocking and removing members from the first non empty sorted set found.



722
723
724
725
726
# File 'lib/redis/distributed.rb', line 722

def bzmpop(timeout, *keys, modifier: "MIN", count: nil)
  ensure_same_node(:bzmpop, keys) do |node|
    node.bzmpop(timeout, *keys, modifier: modifier, count: count)
  end
end

#bzpopmax(*args) ⇒ Object



499
500
501
502
503
# File 'lib/redis/distributed.rb', line 499

def bzpopmax(*args)
  _bpop(:bzpopmax, args) do |reply|
    reply.is_a?(Array) ? [reply[0], reply[1], Floatify.call(reply[2])] : reply
  end
end

#bzpopmin(*args) ⇒ Object



505
506
507
508
509
# File 'lib/redis/distributed.rb', line 505

def bzpopmin(*args)
  _bpop(:bzpopmin, args) do |reply|
    reply.is_a?(Array) ? [reply[0], reply[1], Floatify.call(reply[2])] : reply
  end
end

#closeObject



69
70
71
# File 'lib/redis/distributed.rb', line 69

def close
  on_each_node :close
end

#copy(source, destination, **options) ⇒ Object

Copy a value from one key to another.



226
227
228
229
230
# File 'lib/redis/distributed.rb', line 226

def copy(source, destination, **options)
  ensure_same_node(:copy, [source, destination]) do |node|
    node.copy(source, destination, **options)
  end
end

#dbsizeObject

Return the number of keys in the selected database.



79
80
81
# File 'lib/redis/distributed.rb', line 79

def dbsize
  on_each_node :dbsize
end

#decr(key) ⇒ Object

Decrement the integer value of a key by one.



266
267
268
# File 'lib/redis/distributed.rb', line 266

def decr(key)
  node_for(key).decr(key)
end

#decrby(key, decrement) ⇒ Object

Decrement the integer value of a key by the given number.



271
272
273
# File 'lib/redis/distributed.rb', line 271

def decrby(key, decrement)
  node_for(key).decrby(key, decrement)
end

#del(*args) ⇒ Object

Delete a key.



179
180
181
182
183
184
185
# File 'lib/redis/distributed.rb', line 179

def del(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.del(*keys)
  end
end

#discardObject

Discard all commands issued after MULTI.

Raises:



1009
1010
1011
1012
1013
1014
1015
# File 'lib/redis/distributed.rb', line 1009

def discard
  raise CannotDistribute, :discard unless @watch_key

  result = node_for(@watch_key).discard
  @watch_key = nil
  result
end

#dump(key) ⇒ Object

Return a serialized version of the value stored at a key.



164
165
166
# File 'lib/redis/distributed.rb', line 164

def dump(key)
  node_for(key).dump(key)
end

#dupObject



1069
1070
1071
# File 'lib/redis/distributed.rb', line 1069

def dup
  self.class.new(@node_configs, @default_options)
end

#echo(value) ⇒ Object

Echo the given string.



60
61
62
# File 'lib/redis/distributed.rb', line 60

def echo(value)
  on_each_node :echo, value
end

#eval(*args) ⇒ Object

Evaluate Lua script.



1056
1057
1058
# File 'lib/redis/distributed.rb', line 1056

def eval(*args)
  _eval(:eval, args)
end

#evalsha(*args) ⇒ Object

Evaluate Lua script by its SHA.



1061
1062
1063
# File 'lib/redis/distributed.rb', line 1061

def evalsha(*args)
  _eval(:evalsha, args)
end

#execObject

Execute all commands issued after MULTI.

Raises:



1000
1001
1002
1003
1004
1005
1006
# File 'lib/redis/distributed.rb', line 1000

def exec
  raise CannotDistribute, :exec unless @watch_key

  result = node_for(@watch_key).exec
  @watch_key = nil
  result
end

#exists(*args) ⇒ Object

Determine if a key exists.



197
198
199
200
201
202
203
# File 'lib/redis/distributed.rb', line 197

def exists(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.exists(*keys)
  end
end

#exists?(*args) ⇒ Boolean

Determine if any of the keys exists.

Returns:

  • (Boolean)


206
207
208
209
210
211
212
213
# File 'lib/redis/distributed.rb', line 206

def exists?(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.each do |node, keys|
    return true if node.exists?(*keys)
  end
  false
end

#expire(key, seconds, **kwargs) ⇒ Object

Set a key’s time to live in seconds.



124
125
126
# File 'lib/redis/distributed.rb', line 124

def expire(key, seconds, **kwargs)
  node_for(key).expire(key, seconds, **kwargs)
end

#expireat(key, unix_time, **kwargs) ⇒ Object

Set the expiration for a key as a UNIX timestamp.



129
130
131
# File 'lib/redis/distributed.rb', line 129

def expireat(key, unix_time, **kwargs)
  node_for(key).expireat(key, unix_time, **kwargs)
end

#expiretime(key) ⇒ Object

Get the expiration for a key as a UNIX timestamp.



134
135
136
# File 'lib/redis/distributed.rb', line 134

def expiretime(key)
  node_for(key).expiretime(key)
end

#flushallObject

Remove all keys from all databases.



84
85
86
# File 'lib/redis/distributed.rb', line 84

def flushall
  on_each_node :flushall
end

#flushdbObject

Remove all keys from the current database.



89
90
91
# File 'lib/redis/distributed.rb', line 89

def flushdb
  on_each_node :flushdb
end

#get(key) ⇒ Object

Get the value of a key.



329
330
331
# File 'lib/redis/distributed.rb', line 329

def get(key)
  node_for(key).get(key)
end

#getbit(key, offset) ⇒ Object

Returns the bit value at offset in the string value stored at key.



373
374
375
# File 'lib/redis/distributed.rb', line 373

def getbit(key, offset)
  node_for(key).getbit(key, offset)
end

#getdel(key) ⇒ Object

Get the value of a key and delete it.



334
335
336
# File 'lib/redis/distributed.rb', line 334

def getdel(key)
  node_for(key).getdel(key)
end

#getex(key, **options) ⇒ Object

Get the value of a key and sets its time to live based on options.



339
340
341
# File 'lib/redis/distributed.rb', line 339

def getex(key, **options)
  node_for(key).getex(key, **options)
end

#getrange(key, start, stop) ⇒ Object

Get a substring of the string stored at a key.



363
364
365
# File 'lib/redis/distributed.rb', line 363

def getrange(key, start, stop)
  node_for(key).getrange(key, start, stop)
end

#getset(key, value) ⇒ Object

Set the string value of a key and return its old value.



401
402
403
# File 'lib/redis/distributed.rb', line 401

def getset(key, value)
  node_for(key).getset(key, value)
end

#hdel(key, *fields) ⇒ Object

Delete one or more hash fields.



886
887
888
889
# File 'lib/redis/distributed.rb', line 886

def hdel(key, *fields)
  fields.flatten!(1)
  node_for(key).hdel(key, fields)
end

#hexists(key, field) ⇒ Object

Determine if a hash field exists.



892
893
894
# File 'lib/redis/distributed.rb', line 892

def hexists(key, field)
  node_for(key).hexists(key, field)
end

#hget(key, field) ⇒ Object

Get the value of a hash field.



866
867
868
# File 'lib/redis/distributed.rb', line 866

def hget(key, field)
  node_for(key).hget(key, field)
end

#hgetall(key) ⇒ Object

Get all the fields and values in a hash.



917
918
919
# File 'lib/redis/distributed.rb', line 917

def hgetall(key)
  node_for(key).hgetall(key)
end

#hincrby(key, field, increment) ⇒ Object

Increment the integer value of a hash field by the given integer number.



897
898
899
# File 'lib/redis/distributed.rb', line 897

def hincrby(key, field, increment)
  node_for(key).hincrby(key, field, increment)
end

#hincrbyfloat(key, field, increment) ⇒ Object

Increment the numeric value of a hash field by the given float number.



902
903
904
# File 'lib/redis/distributed.rb', line 902

def hincrbyfloat(key, field, increment)
  node_for(key).hincrbyfloat(key, field, increment)
end

#hkeys(key) ⇒ Object

Get all the fields in a hash.



907
908
909
# File 'lib/redis/distributed.rb', line 907

def hkeys(key)
  node_for(key).hkeys(key)
end

#hlen(key) ⇒ Object

Get the number of fields in a hash.



842
843
844
# File 'lib/redis/distributed.rb', line 842

def hlen(key)
  node_for(key).hlen(key)
end

#hmget(key, *fields) ⇒ Object

Get the values of all the given hash fields.



871
872
873
874
# File 'lib/redis/distributed.rb', line 871

def hmget(key, *fields)
  fields.flatten!(1)
  node_for(key).hmget(key, fields)
end

#hmset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



857
858
859
# File 'lib/redis/distributed.rb', line 857

def hmset(key, *attrs)
  node_for(key).hmset(key, *attrs)
end

#hrandfield(key, count = nil, **options) ⇒ Object



881
882
883
# File 'lib/redis/distributed.rb', line 881

def hrandfield(key, count = nil, **options)
  node_for(key).hrandfield(key, count, **options)
end

#hset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



847
848
849
# File 'lib/redis/distributed.rb', line 847

def hset(key, *attrs)
  node_for(key).hset(key, *attrs)
end

#hsetnx(key, field, value) ⇒ Object

Set the value of a hash field, only if the field does not exist.



852
853
854
# File 'lib/redis/distributed.rb', line 852

def hsetnx(key, field, value)
  node_for(key).hsetnx(key, field, value)
end

#hvals(key) ⇒ Object

Get all the values in a hash.



912
913
914
# File 'lib/redis/distributed.rb', line 912

def hvals(key)
  node_for(key).hvals(key)
end

#incr(key) ⇒ Object

Increment the integer value of a key by one.



276
277
278
# File 'lib/redis/distributed.rb', line 276

def incr(key)
  node_for(key).incr(key)
end

#incrby(key, increment) ⇒ Object

Increment the integer value of a key by the given integer number.



281
282
283
# File 'lib/redis/distributed.rb', line 281

def incrby(key, increment)
  node_for(key).incrby(key, increment)
end

#incrbyfloat(key, increment) ⇒ Object

Increment the numeric value of a key by the given float number.



286
287
288
# File 'lib/redis/distributed.rb', line 286

def incrbyfloat(key, increment)
  node_for(key).incrbyfloat(key, increment)
end

#info(cmd = nil) ⇒ Object

Get information and statistics about the server.



94
95
96
# File 'lib/redis/distributed.rb', line 94

def info(cmd = nil)
  on_each_node :info, cmd
end

#inspectObject



1065
1066
1067
# File 'lib/redis/distributed.rb', line 1065

def inspect
  "#<Redis client v#{Redis::VERSION} for #{nodes.map(&:id).join(', ')}>"
end

#keys(glob = "*") ⇒ Object

Find all keys matching the given pattern.



216
217
218
# File 'lib/redis/distributed.rb', line 216

def keys(glob = "*")
  on_each_node(:keys, glob).flatten
end

#lastsaveObject

Get the UNIX time stamp of the last successful save to disk.



99
100
101
# File 'lib/redis/distributed.rb', line 99

def lastsave
  on_each_node :lastsave
end

#lindex(key, index) ⇒ Object

Get an element from a list by its index.



526
527
528
# File 'lib/redis/distributed.rb', line 526

def lindex(key, index)
  node_for(key).lindex(key, index)
end

#linsert(key, where, pivot, value) ⇒ Object

Insert an element before or after another element in a list.



531
532
533
# File 'lib/redis/distributed.rb', line 531

def linsert(key, where, pivot, value)
  node_for(key).linsert(key, where, pivot, value)
end

#llen(key) ⇒ Object

Get the length of a list.



419
420
421
# File 'lib/redis/distributed.rb', line 419

def llen(key)
  node_for(key).llen(key)
end

#lmove(source, destination, where_source, where_destination) ⇒ Object

Remove the first/last element in a list, append/prepend it to another list and return it.



424
425
426
427
428
# File 'lib/redis/distributed.rb', line 424

def lmove(source, destination, where_source, where_destination)
  ensure_same_node(:lmove, [source, destination]) do |node|
    node.lmove(source, destination, where_source, where_destination)
  end
end

#lmpop(*keys, modifier: "LEFT", count: nil) ⇒ Object

Iterate over keys, removing elements from the first non list found.



563
564
565
566
567
# File 'lib/redis/distributed.rb', line 563

def lmpop(*keys, modifier: "LEFT", count: nil)
  ensure_same_node(:lmpop, keys) do |node|
    node.lmpop(*keys, modifier: modifier, count: count)
  end
end

#lpop(key, count = nil) ⇒ Object

Remove and get the first elements in a list.



459
460
461
# File 'lib/redis/distributed.rb', line 459

def lpop(key, count = nil)
  node_for(key).lpop(key, count)
end

#lpush(key, value) ⇒ Object

Prepend one or more values to a list.



439
440
441
# File 'lib/redis/distributed.rb', line 439

def lpush(key, value)
  node_for(key).lpush(key, value)
end

#lpushx(key, value) ⇒ Object

Prepend a value to a list, only if the list exists.



444
445
446
# File 'lib/redis/distributed.rb', line 444

def lpushx(key, value)
  node_for(key).lpushx(key, value)
end

#lrange(key, start, stop) ⇒ Object

Get a range of elements from a list.



536
537
538
# File 'lib/redis/distributed.rb', line 536

def lrange(key, start, stop)
  node_for(key).lrange(key, start, stop)
end

#lrem(key, count, value) ⇒ Object

Remove elements from a list.



541
542
543
# File 'lib/redis/distributed.rb', line 541

def lrem(key, count, value)
  node_for(key).lrem(key, count, value)
end

#lset(key, index, value) ⇒ Object

Set the value of an element in a list by its index.



546
547
548
# File 'lib/redis/distributed.rb', line 546

def lset(key, index, value)
  node_for(key).lset(key, index, value)
end

#ltrim(key, start, stop) ⇒ Object

Trim a list to the specified range.



551
552
553
# File 'lib/redis/distributed.rb', line 551

def ltrim(key, start, stop)
  node_for(key).ltrim(key, start, stop)
end

#mapped_hmget(key, *fields) ⇒ Object



876
877
878
879
# File 'lib/redis/distributed.rb', line 876

def mapped_hmget(key, *fields)
  fields.flatten!(1)
  node_for(key).mapped_hmget(key, fields)
end

#mapped_hmset(key, hash) ⇒ Object



861
862
863
# File 'lib/redis/distributed.rb', line 861

def mapped_hmset(key, hash)
  node_for(key).hmset(key, hash)
end

#mapped_mget(*keys) ⇒ Object

Get the values of all the given keys as a Hash.



350
351
352
353
354
355
# File 'lib/redis/distributed.rb', line 350

def mapped_mget(*keys)
  keys.flatten!(1)
  keys.group_by { |k| node_for k }.inject({}) do |results, (node, subkeys)|
    results.merge! node.mapped_mget(*subkeys)
  end
end

#mapped_mset(_hash) ⇒ Object

Raises:



315
316
317
# File 'lib/redis/distributed.rb', line 315

def mapped_mset(_hash)
  raise CannotDistribute, :mapped_mset
end

#mapped_msetnx(_hash) ⇒ Object

Raises:



324
325
326
# File 'lib/redis/distributed.rb', line 324

def mapped_msetnx(_hash)
  raise CannotDistribute, :mapped_msetnx
end

#mget(*keys) ⇒ Object

Get the values of all the given keys as an Array.



344
345
346
347
# File 'lib/redis/distributed.rb', line 344

def mget(*keys)
  keys.flatten!(1)
  mapped_mget(*keys).values_at(*keys)
end

#migrate(_key, _options) ⇒ Object

Transfer a key from the connected instance to another instance.

Raises:



174
175
176
# File 'lib/redis/distributed.rb', line 174

def migrate(_key, _options)
  raise CannotDistribute, :migrate
end

#monitorObject

Listen for all requests received by the server in real time.

Raises:

  • (NotImplementedError)


104
105
106
# File 'lib/redis/distributed.rb', line 104

def monitor
  raise NotImplementedError
end

#move(key, db) ⇒ Object

Move a key to another database.



221
222
223
# File 'lib/redis/distributed.rb', line 221

def move(key, db)
  node_for(key).move(key, db)
end

#msetObject

Set multiple keys to multiple values.

Raises:



311
312
313
# File 'lib/redis/distributed.rb', line 311

def mset(*)
  raise CannotDistribute, :mset
end

#msetnxObject

Set multiple keys to multiple values, only if none of the keys exist.

Raises:



320
321
322
# File 'lib/redis/distributed.rb', line 320

def msetnx(*)
  raise CannotDistribute, :msetnx
end

#multi(&block) ⇒ Object

Mark the start of a transaction block.

Raises:



993
994
995
996
997
# File 'lib/redis/distributed.rb', line 993

def multi(&block)
  raise CannotDistribute, :multi unless @watch_key

  node_for(@watch_key).multi(&block)
end

#node_for(key) ⇒ Object

Raises:



30
31
32
33
34
35
# File 'lib/redis/distributed.rb', line 30

def node_for(key)
  key = key_tag(key.to_s) || key.to_s
  raise CannotDistribute, :watch if @watch_key && @watch_key != key

  @ring.get_node(key)
end

#nodesObject



37
38
39
# File 'lib/redis/distributed.rb', line 37

def nodes
  @ring.nodes
end

#persist(key) ⇒ Object

Remove the expiration from a key.



119
120
121
# File 'lib/redis/distributed.rb', line 119

def persist(key)
  node_for(key).persist(key)
end

#pexpire(key, milliseconds, **kwarg) ⇒ Object

Set a key’s time to live in milliseconds.



144
145
146
# File 'lib/redis/distributed.rb', line 144

def pexpire(key, milliseconds, **kwarg)
  node_for(key).pexpire(key, milliseconds, **kwarg)
end

#pexpireat(key, ms_unix_time, **kwarg) ⇒ Object

Set the expiration for a key as number of milliseconds from UNIX Epoch.



149
150
151
# File 'lib/redis/distributed.rb', line 149

def pexpireat(key, ms_unix_time, **kwarg)
  node_for(key).pexpireat(key, ms_unix_time, **kwarg)
end

#pexpiretime(key) ⇒ Object

Get the expiration for a key as number of milliseconds from UNIX Epoch.



154
155
156
# File 'lib/redis/distributed.rb', line 154

def pexpiretime(key)
  node_for(key).pexpiretime(key)
end

#pfadd(key, member) ⇒ Object

Add one or more members to a HyperLogLog structure.



1023
1024
1025
# File 'lib/redis/distributed.rb', line 1023

def pfadd(key, member)
  node_for(key).pfadd(key, member)
end

#pfcount(*keys) ⇒ Object

Get the approximate cardinality of members added to HyperLogLog structure.



1028
1029
1030
1031
1032
# File 'lib/redis/distributed.rb', line 1028

def pfcount(*keys)
  ensure_same_node(:pfcount, keys.flatten(1)) do |node|
    node.pfcount(keys)
  end
end

#pfmerge(dest_key, *source_key) ⇒ Object

Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of the observed Sets of the source HyperLogLog structures.



1036
1037
1038
1039
1040
# File 'lib/redis/distributed.rb', line 1036

def pfmerge(dest_key, *source_key)
  ensure_same_node(:pfmerge, [dest_key, *source_key]) do |node|
    node.pfmerge(dest_key, *source_key)
  end
end

#pingObject

Ping the server.



55
56
57
# File 'lib/redis/distributed.rb', line 55

def ping
  on_each_node :ping
end

#pipelinedObject

Raises:



988
989
990
# File 'lib/redis/distributed.rb', line 988

def pipelined
  raise CannotDistribute, :pipelined
end

#psetex(key, ttl, value) ⇒ Object

Set the time to live in milliseconds of a key.



301
302
303
# File 'lib/redis/distributed.rb', line 301

def psetex(key, ttl, value)
  node_for(key).psetex(key, ttl, value)
end

#psubscribe(*channels, &block) ⇒ Object

Listen for messages published to channels matching the given patterns. See the [Redis Server PSUBSCRIBE documentation](redis.io/docs/latest/commands/psubscribe/) for further details

Raises:

  • (NotImplementedError)


953
954
955
# File 'lib/redis/distributed.rb', line 953

def psubscribe(*channels, &block)
  raise NotImplementedError
end

#pttl(key) ⇒ Object

Get the time to live (in milliseconds) for a key.



159
160
161
# File 'lib/redis/distributed.rb', line 159

def pttl(key)
  node_for(key).pttl(key)
end

#publish(channel, message) ⇒ Object

Post a message to a channel.



922
923
924
# File 'lib/redis/distributed.rb', line 922

def publish(channel, message)
  node_for(channel).publish(channel, message)
end

#punsubscribe(*channels) ⇒ Object

Stop listening for messages posted to channels matching the given patterns. See the [Redis Server PUNSUBSCRIBE documentation](redis.io/docs/latest/commands/punsubscribe/) for further details

Raises:

  • (NotImplementedError)


961
962
963
# File 'lib/redis/distributed.rb', line 961

def punsubscribe(*channels)
  raise NotImplementedError
end

#quitObject

Close the connection.



65
66
67
# File 'lib/redis/distributed.rb', line 65

def quit
  on_each_node :quit
end

#randomkeyObject

Return a random key from the keyspace.

Raises:



233
234
235
# File 'lib/redis/distributed.rb', line 233

def randomkey
  raise CannotDistribute, :randomkey
end

#rename(old_name, new_name) ⇒ Object

Rename a key.



238
239
240
241
242
# File 'lib/redis/distributed.rb', line 238

def rename(old_name, new_name)
  ensure_same_node(:rename, [old_name, new_name]) do |node|
    node.rename(old_name, new_name)
  end
end

#renamenx(old_name, new_name) ⇒ Object

Rename a key, only if the new key does not exist.



245
246
247
248
249
# File 'lib/redis/distributed.rb', line 245

def renamenx(old_name, new_name)
  ensure_same_node(:renamenx, [old_name, new_name]) do |node|
    node.renamenx(old_name, new_name)
  end
end

#restore(key, ttl, serialized_value, **options) ⇒ Object

Create a key using the serialized value, previously obtained using DUMP.



169
170
171
# File 'lib/redis/distributed.rb', line 169

def restore(key, ttl, serialized_value, **options)
  node_for(key).restore(key, ttl, serialized_value, **options)
end

#rpop(key, count = nil) ⇒ Object

Remove and get the last elements in a list.



464
465
466
# File 'lib/redis/distributed.rb', line 464

def rpop(key, count = nil)
  node_for(key).rpop(key, count)
end

#rpoplpush(source, destination) ⇒ Object

Remove the last element in a list, append it to another list and return it.



470
471
472
473
474
# File 'lib/redis/distributed.rb', line 470

def rpoplpush(source, destination)
  ensure_same_node(:rpoplpush, [source, destination]) do |node|
    node.rpoplpush(source, destination)
  end
end

#rpush(key, value) ⇒ Object

Append one or more values to a list.



449
450
451
# File 'lib/redis/distributed.rb', line 449

def rpush(key, value)
  node_for(key).rpush(key, value)
end

#rpushx(key, value) ⇒ Object

Append a value to a list, only if the list exists.



454
455
456
# File 'lib/redis/distributed.rb', line 454

def rpushx(key, value)
  node_for(key).rpushx(key, value)
end

#sadd(key, *members) ⇒ Object

Add one or more members to a set.



575
576
577
# File 'lib/redis/distributed.rb', line 575

def sadd(key, *members)
  node_for(key).sadd(key, *members)
end

#sadd?(key, *members) ⇒ Boolean

Add one or more members to a set.

Returns:

  • (Boolean)


580
581
582
# File 'lib/redis/distributed.rb', line 580

def sadd?(key, *members)
  node_for(key).sadd?(key, *members)
end

#saveObject

Synchronously save the dataset to disk.



109
110
111
# File 'lib/redis/distributed.rb', line 109

def save
  on_each_node :save
end

#scard(key) ⇒ Object

Get the number of members in a set.



570
571
572
# File 'lib/redis/distributed.rb', line 570

def scard(key)
  node_for(key).scard(key)
end

#script(subcommand, *args) ⇒ Object

Control remote script registry.



1018
1019
1020
# File 'lib/redis/distributed.rb', line 1018

def script(subcommand, *args)
  on_each_node(:script, subcommand, *args)
end

#sdiff(*keys) ⇒ Object

Subtract multiple sets.



637
638
639
640
641
642
# File 'lib/redis/distributed.rb', line 637

def sdiff(*keys)
  keys.flatten!(1)
  ensure_same_node(:sdiff, keys) do |node|
    node.sdiff(keys)
  end
end

#sdiffstore(destination, *keys) ⇒ Object

Subtract multiple sets and store the resulting set in a key.



645
646
647
648
649
650
# File 'lib/redis/distributed.rb', line 645

def sdiffstore(destination, *keys)
  keys.flatten!(1)
  ensure_same_node(:sdiffstore, [destination].concat(keys)) do |node|
    node.sdiffstore(destination, keys)
  end
end

#select(db) ⇒ Object

Change the selected database for the current connection.



50
51
52
# File 'lib/redis/distributed.rb', line 50

def select(db)
  on_each_node :select, db
end

#set(key, value, **options) ⇒ Object

Set the string value of a key.



291
292
293
# File 'lib/redis/distributed.rb', line 291

def set(key, value, **options)
  node_for(key).set(key, value, **options)
end

#setbit(key, offset, value) ⇒ Object

Sets or clears the bit at offset in the string value stored at key.



368
369
370
# File 'lib/redis/distributed.rb', line 368

def setbit(key, offset, value)
  node_for(key).setbit(key, offset, value)
end

#setex(key, ttl, value) ⇒ Object

Set the time to live in seconds of a key.



296
297
298
# File 'lib/redis/distributed.rb', line 296

def setex(key, ttl, value)
  node_for(key).setex(key, ttl, value)
end

#setnx(key, value) ⇒ Object

Set the value of a key, only if the key does not exist.



306
307
308
# File 'lib/redis/distributed.rb', line 306

def setnx(key, value)
  node_for(key).setnx(key, value)
end

#setrange(key, offset, value) ⇒ Object

Overwrite part of a string at key starting at the specified offset.



358
359
360
# File 'lib/redis/distributed.rb', line 358

def setrange(key, offset, value)
  node_for(key).setrange(key, offset, value)
end

#sinter(*keys) ⇒ Object

Intersect multiple sets.



653
654
655
656
657
658
# File 'lib/redis/distributed.rb', line 653

def sinter(*keys)
  keys.flatten!(1)
  ensure_same_node(:sinter, keys) do |node|
    node.sinter(keys)
  end
end

#sinterstore(destination, *keys) ⇒ Object

Intersect multiple sets and store the resulting set in a key.



661
662
663
664
665
666
# File 'lib/redis/distributed.rb', line 661

def sinterstore(destination, *keys)
  keys.flatten!(1)
  ensure_same_node(:sinterstore, [destination].concat(keys)) do |node|
    node.sinterstore(destination, keys)
  end
end

#sismember(key, member) ⇒ Object

Determine if a given value is a member of a set.



612
613
614
# File 'lib/redis/distributed.rb', line 612

def sismember(key, member)
  node_for(key).sismember(key, member)
end

#smembers(key) ⇒ Object

Get all the members in a set.



622
623
624
# File 'lib/redis/distributed.rb', line 622

def smembers(key)
  node_for(key).smembers(key)
end

#smismember(key, *members) ⇒ Object

Determine if multiple values are members of a set.



617
618
619
# File 'lib/redis/distributed.rb', line 617

def smismember(key, *members)
  node_for(key).smismember(key, *members)
end

#smove(source, destination, member) ⇒ Object

Move a member from one set to another.



605
606
607
608
609
# File 'lib/redis/distributed.rb', line 605

def smove(source, destination, member)
  ensure_same_node(:smove, [source, destination]) do |node|
    node.smove(source, destination, member)
  end
end

#sort(key, **options) ⇒ Object

Sort the elements in a list, set or sorted set.



252
253
254
255
256
257
258
# File 'lib/redis/distributed.rb', line 252

def sort(key, **options)
  keys = [key, options[:by], options[:store], *Array(options[:get])].compact

  ensure_same_node(:sort, keys) do |node|
    node.sort(key, **options)
  end
end

#spop(key, count = nil) ⇒ Object

Remove and return a random member from a set.



595
596
597
# File 'lib/redis/distributed.rb', line 595

def spop(key, count = nil)
  node_for(key).spop(key, count)
end

#srandmember(key, count = nil) ⇒ Object

Get a random member from a set.



600
601
602
# File 'lib/redis/distributed.rb', line 600

def srandmember(key, count = nil)
  node_for(key).srandmember(key, count)
end

#srem(key, *members) ⇒ Object

Remove one or more members from a set.



585
586
587
# File 'lib/redis/distributed.rb', line 585

def srem(key, *members)
  node_for(key).srem(key, *members)
end

#srem?(key, *members) ⇒ Boolean

Remove one or more members from a set.

Returns:

  • (Boolean)


590
591
592
# File 'lib/redis/distributed.rb', line 590

def srem?(key, *members)
  node_for(key).srem?(key, *members)
end

#sscan(key, cursor, **options) ⇒ Object

Scan a set



627
628
629
# File 'lib/redis/distributed.rb', line 627

def sscan(key, cursor, **options)
  node_for(key).sscan(key, cursor, **options)
end

#sscan_each(key, **options, &block) ⇒ Object

Scan a set and return an enumerator



632
633
634
# File 'lib/redis/distributed.rb', line 632

def sscan_each(key, **options, &block)
  node_for(key).sscan_each(key, **options, &block)
end

#strlen(key) ⇒ Object

Get the length of the value stored in a key.



406
407
408
# File 'lib/redis/distributed.rb', line 406

def strlen(key)
  node_for(key).strlen(key)
end

#subscribe(channel, *channels, &block) ⇒ Object

Listen for messages published to the given channels.



931
932
933
934
935
936
937
938
939
940
941
# File 'lib/redis/distributed.rb', line 931

def subscribe(channel, *channels, &block)
  if channels.empty?
    @subscribed_node = node_for(channel)
    @subscribed_node.subscribe(channel, &block)
  else
    ensure_same_node(:subscribe, [channel] + channels) do |node|
      @subscribed_node = node
      node.subscribe(channel, *channels, &block)
    end
  end
end

#subscribed?Boolean

Returns:

  • (Boolean)


926
927
928
# File 'lib/redis/distributed.rb', line 926

def subscribed?
  !!@subscribed_node
end

#sunion(*keys) ⇒ Object

Add multiple sets.



669
670
671
672
673
674
# File 'lib/redis/distributed.rb', line 669

def sunion(*keys)
  keys.flatten!(1)
  ensure_same_node(:sunion, keys) do |node|
    node.sunion(keys)
  end
end

#sunionstore(destination, *keys) ⇒ Object

Add multiple sets and store the resulting set in a key.



677
678
679
680
681
682
# File 'lib/redis/distributed.rb', line 677

def sunionstore(destination, *keys)
  keys.flatten!(1)
  ensure_same_node(:sunionstore, [destination].concat(keys)) do |node|
    node.sunionstore(destination, keys)
  end
end

#timeObject

Get server time: an UNIX timestamp and the elapsed microseconds in the current second.



114
115
116
# File 'lib/redis/distributed.rb', line 114

def time
  on_each_node :time
end

#ttl(key) ⇒ Object

Get the time to live (in seconds) for a key.



139
140
141
# File 'lib/redis/distributed.rb', line 139

def ttl(key)
  node_for(key).ttl(key)
end

#type(key) ⇒ Object

Determine the type stored at key.



261
262
263
# File 'lib/redis/distributed.rb', line 261

def type(key)
  node_for(key).type(key)
end

Unlink keys.



188
189
190
191
192
193
194
# File 'lib/redis/distributed.rb', line 188

def unlink(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.unlink(*keys)
  end
end

#unsubscribe(*channels) ⇒ Object

Stop listening for messages posted to the given channels.

Raises:



944
945
946
947
948
# File 'lib/redis/distributed.rb', line 944

def unsubscribe(*channels)
  raise SubscriptionError, "Can't unsubscribe if not subscribed." unless subscribed?

  @subscribed_node.unsubscribe(*channels)
end

#unwatchObject

Forget about all watched keys.

Raises:



980
981
982
983
984
985
986
# File 'lib/redis/distributed.rb', line 980

def unwatch
  raise CannotDistribute, :unwatch unless @watch_key

  result = node_for(@watch_key).unwatch
  @watch_key = nil
  result
end

#watch(*keys, &block) ⇒ Object

Watch the given keys to determine execution of the MULTI/EXEC block.



966
967
968
969
970
971
972
973
974
975
976
977
# File 'lib/redis/distributed.rb', line 966

def watch(*keys, &block)
  ensure_same_node(:watch, keys) do |node|
    @watch_key = key_tag(keys.first) || keys.first.to_s

    begin
      node.watch(*keys, &block)
    rescue StandardError
      @watch_key = nil
      raise
    end
  end
end

#zadd(key, *args) ⇒ Object

Add one or more members to a sorted set, or update the score for members that already exist.



691
692
693
# File 'lib/redis/distributed.rb', line 691

def zadd(key, *args)
  node_for(key).zadd(key, *args)
end

#zcard(key) ⇒ Object

Get the number of members in a sorted set.



685
686
687
# File 'lib/redis/distributed.rb', line 685

def zcard(key)
  node_for(key).zcard(key)
end

#zcount(key, min, max) ⇒ Object

Get the number of members in a particular score range.



787
788
789
# File 'lib/redis/distributed.rb', line 787

def zcount(key, min, max)
  node_for(key).zcount(key, min, max)
end

#zdiff(*keys, **options) ⇒ Object

Return the difference between the first and all successive input sorted sets.



825
826
827
828
829
830
# File 'lib/redis/distributed.rb', line 825

def zdiff(*keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zdiff, keys) do |node|
    node.zdiff(keys, **options)
  end
end

#zdiffstore(destination, *keys, **options) ⇒ Object

Compute the difference between the first and all successive input sorted sets and store the resulting sorted set in a new key.



834
835
836
837
838
839
# File 'lib/redis/distributed.rb', line 834

def zdiffstore(destination, *keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zdiffstore, [destination] + keys) do |node|
    node.zdiffstore(destination, keys, **options)
  end
end

#zincrby(key, increment, member) ⇒ Object

Increment the score of a member in a sorted set.



697
698
699
# File 'lib/redis/distributed.rb', line 697

def zincrby(key, increment, member)
  node_for(key).zincrby(key, increment, member)
end

#zinter(*keys, **options) ⇒ Object

Get the intersection of multiple sorted sets



792
793
794
795
796
797
# File 'lib/redis/distributed.rb', line 792

def zinter(*keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zinter, keys) do |node|
    node.zinter(keys, **options)
  end
end

#zinterstore(destination, *keys, **options) ⇒ Object

Intersect multiple sorted sets and store the resulting sorted set in a new key.



801
802
803
804
805
806
# File 'lib/redis/distributed.rb', line 801

def zinterstore(destination, *keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zinterstore, [destination].concat(keys)) do |node|
    node.zinterstore(destination, keys, **options)
  end
end

#zmpop(*keys, modifier: "MIN", count: nil) ⇒ Object

Iterate over keys, removing members from the first non empty sorted set found.



729
730
731
732
733
# File 'lib/redis/distributed.rb', line 729

def zmpop(*keys, modifier: "MIN", count: nil)
  ensure_same_node(:zmpop, keys) do |node|
    node.zmpop(*keys, modifier: modifier, count: count)
  end
end

#zmscore(key, *members) ⇒ Object

Get the scores associated with the given members in a sorted set.



717
718
719
# File 'lib/redis/distributed.rb', line 717

def zmscore(key, *members)
  node_for(key).zmscore(key, *members)
end

#zrandmember(key, count = nil, **options) ⇒ Object

Get one or more random members from a sorted set.



712
713
714
# File 'lib/redis/distributed.rb', line 712

def zrandmember(key, count = nil, **options)
  node_for(key).zrandmember(key, count, **options)
end

#zrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, score or lexicographical ordering.



736
737
738
# File 'lib/redis/distributed.rb', line 736

def zrange(key, start, stop, **options)
  node_for(key).zrange(key, start, stop, **options)
end

#zrangebyscore(key, min, max, **options) ⇒ Object

Return a range of members in a sorted set, by score.



771
772
773
# File 'lib/redis/distributed.rb', line 771

def zrangebyscore(key, min, max, **options)
  node_for(key).zrangebyscore(key, min, max, **options)
end

#zrangestore(dest_key, src_key, start, stop, **options) ⇒ Object

Select a range of members in a sorted set, by index, score or lexicographical ordering and store the resulting sorted set in a new key.



742
743
744
745
746
# File 'lib/redis/distributed.rb', line 742

def zrangestore(dest_key, src_key, start, stop, **options)
  ensure_same_node(:zrangestore, [dest_key, src_key]) do |node|
    node.zrangestore(dest_key, src_key, start, stop, **options)
  end
end

#zrank(key, member) ⇒ Object

Determine the index of a member in a sorted set.



755
756
757
# File 'lib/redis/distributed.rb', line 755

def zrank(key, member)
  node_for(key).zrank(key, member)
end

#zrem(key, member) ⇒ Object

Remove one or more members from a sorted set.



702
703
704
# File 'lib/redis/distributed.rb', line 702

def zrem(key, member)
  node_for(key).zrem(key, member)
end

#zremrangebyrank(key, start, stop) ⇒ Object

Remove all members in a sorted set within the given indexes.



766
767
768
# File 'lib/redis/distributed.rb', line 766

def zremrangebyrank(key, start, stop)
  node_for(key).zremrangebyrank(key, start, stop)
end

#zremrangebyscore(key, min, max) ⇒ Object

Remove all members in a sorted set within the given scores.



782
783
784
# File 'lib/redis/distributed.rb', line 782

def zremrangebyscore(key, min, max)
  node_for(key).zremrangebyscore(key, min, max)
end

#zrevrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, with scores ordered from high to low.



750
751
752
# File 'lib/redis/distributed.rb', line 750

def zrevrange(key, start, stop, **options)
  node_for(key).zrevrange(key, start, stop, **options)
end

#zrevrangebyscore(key, max, min, **options) ⇒ Object

Return a range of members in a sorted set, by score, with scores ordered from high to low.



777
778
779
# File 'lib/redis/distributed.rb', line 777

def zrevrangebyscore(key, max, min, **options)
  node_for(key).zrevrangebyscore(key, max, min, **options)
end

#zrevrank(key, member) ⇒ Object

Determine the index of a member in a sorted set, with scores ordered from high to low.



761
762
763
# File 'lib/redis/distributed.rb', line 761

def zrevrank(key, member)
  node_for(key).zrevrank(key, member)
end

#zscore(key, member) ⇒ Object

Get the score associated with the given member in a sorted set.



707
708
709
# File 'lib/redis/distributed.rb', line 707

def zscore(key, member)
  node_for(key).zscore(key, member)
end

#zunion(*keys, **options) ⇒ Object

Return the union of multiple sorted sets.



809
810
811
812
813
814
# File 'lib/redis/distributed.rb', line 809

def zunion(*keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zunion, keys) do |node|
    node.zunion(keys, **options)
  end
end

#zunionstore(destination, *keys, **options) ⇒ Object

Add multiple sorted sets and store the resulting sorted set in a new key.



817
818
819
820
821
822
# File 'lib/redis/distributed.rb', line 817

def zunionstore(destination, *keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zunionstore, [destination].concat(keys)) do |node|
    node.zunionstore(destination, keys, **options)
  end
end