Class: RedisClient::Cluster
- Inherits:
-
Object
- Object
- RedisClient::Cluster
show all
- Defined in:
- lib/redis_client/cluster.rb,
lib/redis_client/cluster/node.rb,
lib/redis_client/cluster/errors.rb,
lib/redis_client/cluster/router.rb,
lib/redis_client/cluster/command.rb,
lib/redis_client/cluster/pub_sub.rb,
lib/redis_client/cluster/node_key.rb,
lib/redis_client/cluster/pipeline.rb,
lib/redis_client/cluster/transaction.rb,
lib/redis_client/cluster/concurrent_worker.rb,
lib/redis_client/cluster/node/primary_only.rb,
lib/redis_client/cluster/key_slot_converter.rb,
lib/redis_client/cluster/node/base_topology.rb,
lib/redis_client/cluster/optimistic_locking.rb,
lib/redis_client/cluster/node/random_replica.rb,
lib/redis_client/cluster/normalized_cmd_name.rb,
lib/redis_client/cluster/error_identification.rb,
lib/redis_client/cluster/node/latency_replica.rb,
lib/redis_client/cluster/concurrent_worker/none.rb,
lib/redis_client/cluster/concurrent_worker/pooled.rb,
lib/redis_client/cluster/concurrent_worker/on_demand.rb,
lib/redis_client/cluster/node/random_replica_or_primary.rb
Defined Under Namespace
Modules: ConcurrentWorker, ErrorIdentification, KeySlotConverter, NodeKey
Classes: AmbiguousNodeError, Command, ErrorCollection, InitialSetupError, Node, NodeMightBeDown, NormalizedCmdName, OptimisticLocking, OrchestrationCommandNotSupported, Pipeline, PubSub, Router, Transaction
Constant Summary
collapse
- ZERO_CURSOR_FOR_SCAN =
'0'
- ERR_ARG_NORMALIZATION =
->(arg) { Array[arg].flatten.reject { |e| e.nil? || (e.respond_to?(:empty?) && e.empty?) } }
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#blocking_call(timeout, *args, **kwargs, &block) ⇒ Object
-
#blocking_call_v(timeout, command, &block) ⇒ Object
-
#call(*args, **kwargs, &block) ⇒ Object
-
#call_once(*args, **kwargs, &block) ⇒ Object
-
#call_once_v(command, &block) ⇒ Object
-
#call_v(command, &block) ⇒ Object
-
#close ⇒ Object
-
#hscan(key, *args, **kwargs, &block) ⇒ Object
-
#initialize(config, pool: nil, concurrency: nil, **kwargs) ⇒ Cluster
constructor
A new instance of Cluster.
-
#inspect ⇒ Object
-
#multi(watch: nil) ⇒ Object
-
#pipelined(exception: true) {|pipeline| ... } ⇒ Object
-
#pubsub ⇒ Object
-
#scan(*args, **kwargs, &block) ⇒ Object
-
#sscan(key, *args, **kwargs, &block) ⇒ Object
-
#with ⇒ Object
-
#zscan(key, *args, **kwargs, &block) ⇒ Object
Constructor Details
#initialize(config, pool: nil, concurrency: nil, **kwargs) ⇒ Cluster
Returns a new instance of Cluster.
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/redis_client/cluster.rb', line 16
def initialize(config, pool: nil, concurrency: nil, **kwargs)
@config = config
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
@command_builder = config.command_builder
@pool = pool
@kwargs = kwargs
@router = nil
@mutex = Mutex.new
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args, **kwargs, &block) ⇒ Object
145
146
147
148
149
150
151
152
153
|
# File 'lib/redis_client/cluster.rb', line 145
def method_missing(name, *args, **kwargs, &block)
if router.command_exists?(name)
args.unshift(name)
command = @command_builder.generate(args, kwargs)
return router.send_command(:call_v, command, &block)
end
super
end
|
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
14
15
16
|
# File 'lib/redis_client/cluster.rb', line 14
def config
@config
end
|
Instance Method Details
#blocking_call(timeout, *args, **kwargs, &block) ⇒ Object
52
53
54
55
|
# File 'lib/redis_client/cluster.rb', line 52
def blocking_call(timeout, *args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
router.send_command(:blocking_call_v, command, timeout, &block)
end
|
#blocking_call_v(timeout, command, &block) ⇒ Object
57
58
59
60
|
# File 'lib/redis_client/cluster.rb', line 57
def blocking_call_v(timeout, command, &block)
command = @command_builder.generate(command)
router.send_command(:blocking_call_v, command, timeout, &block)
end
|
#call(*args, **kwargs, &block) ⇒ Object
32
33
34
35
|
# File 'lib/redis_client/cluster.rb', line 32
def call(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
router.send_command(:call_v, command, &block)
end
|
#call_once(*args, **kwargs, &block) ⇒ Object
42
43
44
45
|
# File 'lib/redis_client/cluster.rb', line 42
def call_once(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
router.send_command(:call_once_v, command, &block)
end
|
#call_once_v(command, &block) ⇒ Object
47
48
49
50
|
# File 'lib/redis_client/cluster.rb', line 47
def call_once_v(command, &block)
command = @command_builder.generate(command)
router.send_command(:call_once_v, command, &block)
end
|
#call_v(command, &block) ⇒ Object
37
38
39
40
|
# File 'lib/redis_client/cluster.rb', line 37
def call_v(command, &block)
command = @command_builder.generate(command)
router.send_command(:call_v, command, &block)
end
|
#close ⇒ Object
129
130
131
132
133
|
# File 'lib/redis_client/cluster.rb', line 129
def close
@router&.close
@concurrent_worker.close
nil
end
|
#hscan(key, *args, **kwargs, &block) ⇒ Object
79
80
81
82
|
# File 'lib/redis_client/cluster.rb', line 79
def hscan(key, *args, **kwargs, &block)
node = router.assign_node(['HSCAN', key])
router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
end
|
#inspect ⇒ Object
27
28
29
30
|
# File 'lib/redis_client/cluster.rb', line 27
def inspect
node_keys = @router.nil? ? @config.startup_nodes.keys : router.node_keys
"#<#{self.class.name} #{node_keys.join(', ')}>"
end
|
#multi(watch: nil) ⇒ Object
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/redis_client/cluster.rb', line 105
def multi(watch: nil)
if watch.nil? || watch.empty?
transaction = ::RedisClient::Cluster::Transaction.new(router, @command_builder)
yield transaction
return transaction.execute
end
::RedisClient::Cluster::OptimisticLocking.new(router).watch(watch) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
end
end
|
#pipelined(exception: true) {|pipeline| ... } ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/redis_client/cluster.rb', line 89
def pipelined(exception: true)
seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
pipeline = ::RedisClient::Cluster::Pipeline.new(
router,
@command_builder,
@concurrent_worker,
exception: exception,
seed: seed
)
yield pipeline
return [] if pipeline.empty?
pipeline.execute
end
|
#scan(*args, **kwargs, &block) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/redis_client/cluster.rb', line 62
def scan(*args, **kwargs, &block)
raise ArgumentError, 'block required' unless block
seed = Random.new_seed
cursor = ZERO_CURSOR_FOR_SCAN
loop do
cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
keys.each(&block)
break if cursor == ZERO_CURSOR_FOR_SCAN
end
end
|
#sscan(key, *args, **kwargs, &block) ⇒ Object
74
75
76
77
|
# File 'lib/redis_client/cluster.rb', line 74
def sscan(key, *args, **kwargs, &block)
node = router.assign_node(['SSCAN', key])
router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
end
|
#with ⇒ Object
125
126
127
|
# File 'lib/redis_client/cluster.rb', line 125
def with(...)
raise NotImplementedError, 'No way to use'
end
|
#zscan(key, *args, **kwargs, &block) ⇒ Object
84
85
86
87
|
# File 'lib/redis_client/cluster.rb', line 84
def zscan(key, *args, **kwargs, &block)
node = router.assign_node(['ZSCAN', key])
router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
end
|