Class: Octopus::Proxy
- Inherits:
-
Object
show all
- Defined in:
- lib/octopus/proxy.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#check_schema_migrations(shard) ⇒ Object
-
#clean_connection_proxy ⇒ Object
-
#clear_active_connections! ⇒ Object
-
#clear_all_connections! ⇒ Object
-
#clear_query_cache ⇒ Object
-
#connected? ⇒ Boolean
-
#connection_pool ⇒ Object
-
#current_model_replicated? ⇒ Boolean
-
#delete(*args, &block) ⇒ Object
-
#disable_query_cache! ⇒ Object
-
#enable_query_cache! ⇒ Object
-
#execute(sql, name = nil) ⇒ Object
-
#initialize(config = Octopus.config) ⇒ Proxy
constructor
-
#initialize_metadata_table ⇒ Object
-
#initialize_schema_migrations_table ⇒ Object
-
#insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = []) ⇒ Object
-
#method_missing(method, *args, &block) ⇒ Object
-
#respond_to?(method, include_private = false) ⇒ Boolean
-
#run_queries_on_shard(shard, &_block) ⇒ Object
-
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool.
-
#select_all(*args, &block) ⇒ Object
-
#select_connection ⇒ Object
-
#select_value(*args, &block) ⇒ Object
-
#send_queries_to_all_shards(&block) ⇒ Object
-
#send_queries_to_group(group, &block) ⇒ Object
-
#send_queries_to_multiple_shards(shards, &block) ⇒ Object
-
#send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
-
#send_queries_to_slave_group(method, *args, &block) ⇒ Object
-
#should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
-
#should_send_queries_to_slave_group?(method) ⇒ Boolean
-
#transaction(options = {}, &block) ⇒ Object
-
#update(arel, name = nil, binds = []) ⇒ Object
Constructor Details
#initialize(config = Octopus.config) ⇒ Proxy
Returns a new instance of Proxy.
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
130
131
132
|
# File 'lib/octopus/proxy.rb', line 130
def method_missing(method, *args, &block)
legacy_method_missing_logic(method, *args, &block)
end
|
Instance Attribute Details
#proxy_config ⇒ Object
Returns the value of attribute proxy_config.
7
8
9
|
# File 'lib/octopus/proxy.rb', line 7
def proxy_config
@proxy_config
end
|
Instance Method Details
#check_schema_migrations(shard) ⇒ Object
114
115
116
117
118
|
# File 'lib/octopus/proxy.rb', line 114
def check_schema_migrations(shard)
OctopusModel.using(shard).connection.table_exists?(
ActiveRecord::Migrator.schema_migrations_table_name,
) || OctopusModel.using(shard).connection.initialize_schema_migrations_table
end
|
#clean_connection_proxy ⇒ Object
107
108
109
110
111
112
|
# File 'lib/octopus/proxy.rb', line 107
def clean_connection_proxy
self.current_shard = Octopus.master_shard
self.current_model = nil
self.current_group = nil
self.block = nil
end
|
#clear_active_connections! ⇒ Object
157
158
159
|
# File 'lib/octopus/proxy.rb', line 157
def clear_active_connections!
with_each_healthy_shard(&:release_connection)
end
|
#clear_all_connections! ⇒ Object
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/octopus/proxy.rb', line 161
def clear_all_connections!
with_each_healthy_shard(&:disconnect!)
if Octopus.atleast_rails52?
proxy_config.reinitialize_shards
end
end
|
#clear_query_cache ⇒ Object
153
154
155
|
# File 'lib/octopus/proxy.rb', line 153
def clear_query_cache
with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache }
end
|
#connected? ⇒ Boolean
172
173
174
|
# File 'lib/octopus/proxy.rb', line 172
def connected?
shards.any? { |_k, v| v.connected? }
end
|
#connection_pool ⇒ Object
138
139
140
|
# File 'lib/octopus/proxy.rb', line 138
def connection_pool
shards[current_shard]
end
|
#current_model_replicated? ⇒ Boolean
192
193
194
|
# File 'lib/octopus/proxy.rb', line 192
def current_model_replicated?
replicated && (current_model.try(:replicated) || fully_replicated?)
end
|
#delete(*args, &block) ⇒ Object
55
56
57
|
# File 'lib/octopus/proxy.rb', line 55
def delete(*args, &block)
legacy_method_missing_logic('delete', *args, &block)
end
|
#disable_query_cache! ⇒ Object
148
149
150
|
# File 'lib/octopus/proxy.rb', line 148
def disable_query_cache!
with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! }
end
|
#enable_query_cache! ⇒ Object
143
144
145
146
|
# File 'lib/octopus/proxy.rb', line 143
def enable_query_cache!
clear_query_cache
with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! }
end
|
#execute(sql, name = nil) ⇒ Object
36
37
38
39
40
|
# File 'lib/octopus/proxy.rb', line 36
def execute(sql, name = nil)
conn = select_connection
clean_connection_proxy if should_clean_connection_proxy?('execute')
conn.execute(sql, name)
end
|
204
205
206
|
# File 'lib/octopus/proxy.rb', line 204
def initialize_metadata_table
select_connection.transaction { ActiveRecord::InternalMetadata.create_table }
end
|
#initialize_schema_migrations_table ⇒ Object
196
197
198
199
200
201
202
|
# File 'lib/octopus/proxy.rb', line 196
def initialize_schema_migrations_table
if Octopus.atleast_rails52?
select_connection.transaction { ActiveRecord::SchemaMigration.create_table }
else
select_connection.initialize_schema_migrations_table
end
end
|
#insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = []) ⇒ Object
42
43
44
45
46
|
# File 'lib/octopus/proxy.rb', line 42
def insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = [])
conn = select_connection
clean_connection_proxy if should_clean_connection_proxy?('insert')
conn.insert(arel, name, pk, id_value, sequence_name, binds)
end
|
#respond_to?(method, include_private = false) ⇒ Boolean
134
135
136
|
# File 'lib/octopus/proxy.rb', line 134
def respond_to?(method, include_private = false)
super || select_connection.respond_to?(method, include_private)
end
|
#run_queries_on_shard(shard, &_block) ⇒ Object
83
84
85
86
87
88
89
|
# File 'lib/octopus/proxy.rb', line 83
def run_queries_on_shard(shard, &_block)
keeping_connection_proxy(shard) do
using_shard(shard) do
yield
end
end
end
|
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
71
72
73
74
75
76
77
|
# File 'lib/octopus/proxy.rb', line 71
def safe_connection(connection_pool)
connection_pool.automatic_reconnect ||= true
if !connection_pool.connected? && shards[Octopus.master_shard].connection.query_cache_enabled
connection_pool.connection.enable_query_cache!
end
connection_pool.connection
end
|
#select_all(*args, &block) ⇒ Object
59
60
61
|
# File 'lib/octopus/proxy.rb', line 59
def select_all(*args, &block)
legacy_method_missing_logic('select_all', *args, &block)
end
|
#select_connection ⇒ Object
79
80
81
|
# File 'lib/octopus/proxy.rb', line 79
def select_connection
safe_connection(shards[shard_name])
end
|
#select_value(*args, &block) ⇒ Object
63
64
65
|
# File 'lib/octopus/proxy.rb', line 63
def select_value(*args, &block)
legacy_method_missing_logic('select_value', *args, &block)
end
|
#send_queries_to_all_shards(&block) ⇒ Object
103
104
105
|
# File 'lib/octopus/proxy.rb', line 103
def send_queries_to_all_shards(&block)
send_queries_to_multiple_shards(shard_names.uniq { |shard_name| shards[shard_name] }, &block)
end
|
#send_queries_to_group(group, &block) ⇒ Object
97
98
99
100
101
|
# File 'lib/octopus/proxy.rb', line 97
def send_queries_to_group(group, &block)
using_group(group) do
send_queries_to_multiple_shards(shards_for_group(group), &block)
end
end
|
#send_queries_to_multiple_shards(shards, &block) ⇒ Object
91
92
93
94
95
|
# File 'lib/octopus/proxy.rb', line 91
def send_queries_to_multiple_shards(shards, &block)
shards.map do |shard|
run_queries_on_shard(shard, &block)
end
end
|
#send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
180
181
182
|
# File 'lib/octopus/proxy.rb', line 180
def send_queries_to_shard_slave_group(method, *args, &block)
send_queries_to_balancer(shards_slave_groups[current_shard][current_slave_group], method, *args, &block)
end
|
#send_queries_to_slave_group(method, *args, &block) ⇒ Object
188
189
190
|
# File 'lib/octopus/proxy.rb', line 188
def send_queries_to_slave_group(method, *args, &block)
send_queries_to_balancer(slave_groups[current_slave_group], method, *args, &block)
end
|
#should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
176
177
178
|
# File 'lib/octopus/proxy.rb', line 176
def should_send_queries_to_shard_slave_group?(method)
should_use_slaves_for_method?(method) && shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present?
end
|
#should_send_queries_to_slave_group?(method) ⇒ Boolean
184
185
186
|
# File 'lib/octopus/proxy.rb', line 184
def should_send_queries_to_slave_group?(method)
should_use_slaves_for_method?(method) && slave_groups.try(:[], current_slave_group).present?
end
|
#transaction(options = {}, &block) ⇒ Object
120
121
122
123
124
125
126
127
128
|
# File 'lib/octopus/proxy.rb', line 120
def transaction(options = {}, &block)
if !sharded && current_model_replicated?
run_queries_on_shard(Octopus.master_shard) do
select_connection.transaction(options, &block)
end
else
select_connection.transaction(options, &block)
end
end
|
#update(arel, name = nil, binds = []) ⇒ Object
48
49
50
51
52
53
|
# File 'lib/octopus/proxy.rb', line 48
def update(arel, name = nil, binds = [])
conn = select_connection
clean_connection_proxy if should_clean_connection_proxy?('insert')
conn.update(arel, name, binds)
end
|