Class: Octopus::Proxy

Inherits:
Object
  • Object
show all
Defined in:
lib/octopus/proxy.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Octopus.config) ⇒ Proxy

Returns a new instance of Proxy.



6
7
8
9
# File 'lib/octopus/proxy.rb', line 6

def initialize(config = Octopus.config)
  initialize_shards(config)
  initialize_replication(config) if !config.nil? && config["replicated"]
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/octopus/proxy.rb', line 209

def method_missing(method, *args, &block)
  if should_clean_connection?(method)
    conn = select_connection()
    self.last_current_shard = self.current_shard
    clean_proxy()
    conn.send(method, *args, &block)
  elsif should_send_queries_to_replicated_databases?(method)
    send_queries_to_selected_slave(method, *args, &block)
  else
    select_connection().send(method, *args, &block)
  end
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



4
5
6
# File 'lib/octopus/proxy.rb', line 4

def config
  @config
end

Instance Method Details

#blockObject



104
105
106
# File 'lib/octopus/proxy.rb', line 104

def block
  Thread.current["octopus.block"]
end

#block=(block) ⇒ Object



108
109
110
# File 'lib/octopus/proxy.rb', line 108

def block=(block)
  Thread.current["octopus.block"] = block
end

#check_schema_migrations(shard) ⇒ Object



193
194
195
196
197
# File 'lib/octopus/proxy.rb', line 193

def check_schema_migrations(shard)
  if !OctopusModel.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name())
    OctopusModel.using(shard).connection.initialize_schema_migrations_table
  end
end

#clean_proxyObject



187
188
189
190
191
# File 'lib/octopus/proxy.rb', line 187

def clean_proxy()
  self.current_shard = :master
  self.current_group = nil
  self.block = false
end

#clear_all_query_caches!Object



242
243
244
# File 'lib/octopus/proxy.rb', line 242

def clear_all_query_caches!
  @shards.each { |k, v| safe_connection(v).clear_query_cache_without_octopus }
end

#connection_poolObject



226
227
228
# File 'lib/octopus/proxy.rb', line 226

def connection_pool
  return @shards[current_shard]
end

#current_groupObject



91
92
93
# File 'lib/octopus/proxy.rb', line 91

def current_group
  Thread.current["octopus.current_group"]
end

#current_group=(group_symbol) ⇒ Object



95
96
97
98
99
100
101
102
# File 'lib/octopus/proxy.rb', line 95

def current_group=(group_symbol)
  # TODO: Error message should include all groups if given more than one bad name.
  [group_symbol].flatten.compact.each do |group|
    raise "Nonexistent Group Name: #{group}" unless has_group?(group)
  end

  Thread.current["octopus.current_group"] = group_symbol
end

#current_modelObject



69
70
71
# File 'lib/octopus/proxy.rb', line 69

def current_model
  Thread.current["octopus.current_model"]
end

#current_model=(model) ⇒ Object



73
74
75
# File 'lib/octopus/proxy.rb', line 73

def current_model=(model)
  Thread.current["octopus.current_model"] = model.is_a?(ActiveRecord::Base) ? model.class : model
end

#current_shardObject



77
78
79
# File 'lib/octopus/proxy.rb', line 77

def current_shard
  Thread.current["octopus.current_shard"] ||= :master
end

#current_shard=(shard_symbol) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/octopus/proxy.rb', line 81

def current_shard=(shard_symbol)
  if shard_symbol.is_a?(Array)
    shard_symbol.each {|symbol| raise "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? }
  else
    raise "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil?
  end

  Thread.current["octopus.current_shard"] = shard_symbol
end

#disable_query_cache!Object



238
239
240
# File 'lib/octopus/proxy.rb', line 238

def disable_query_cache!
  @shards.each { |k, v| safe_connection(v).disable_query_cache! }
end

#enable_query_cache!Object



230
231
232
233
234
235
236
# File 'lib/octopus/proxy.rb', line 230

def enable_query_cache!
  @shards.each do |k, v|
    c = safe_connection(v)
    c.clear_query_cache_without_octopus
    c.enable_query_cache!
  end
end

#has_group?(group) ⇒ Boolean

Public: Whether or not a group exists with the given name converted to a string.

Returns a boolean.

Returns:

  • (Boolean)


124
125
126
# File 'lib/octopus/proxy.rb', line 124

def has_group?(group)
  @groups.has_key?(group.to_s)
end

#initialize_replication(config) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/octopus/proxy.rb', line 56

def initialize_replication(config)
  @replicated = true
  if config.has_key?("fully_replicated")
    @fully_replicated = config["fully_replicated"]
  else
    @fully_replicated = true
  end

  @slaves_list = @shards.keys.map {|sym| sym.to_s}.sort
  @slaves_list.delete('master')
  @slave_index = 0
end

#initialize_shards(config) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/octopus/proxy.rb', line 11

def initialize_shards(config)
  @shards = HashWithIndifferentAccess.new
  @groups = {}
  @adapters = Set.new
  @shards[:master] = ActiveRecord::Base.connection_pool_without_octopus()
  @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config)

  if !config.nil? && config.has_key?("verify_connection")
    @verify_connection = config["verify_connection"]
  else
    @verify_connection = false
  end

  if !config.nil?
    @entire_sharded = config['entire_sharded']
    shards_config = config[Octopus.rails_env()]
  end

  shards_config ||= []

  shards_config.each do |key, value|
    if value.is_a?(String) && Octopus.rails32?
      value = resolve_string_connection(value).merge(:octopus_shard => key)
      initialize_adapter(value['adapter'])
      @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
    elsif value.is_a?(Hash) && value.has_key?("adapter")
      value.merge!(:octopus_shard => key)
      initialize_adapter(value['adapter'])
      @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
    elsif value.is_a?(Hash)
      @groups[key.to_s] = []

      value.each do |k, v|
        raise "You have duplicated shard names!" if @shards.has_key?(k.to_sym)

        initialize_adapter(v['adapter'])
        config_with_octopus_shard = v.merge(:octopus_shard => k)

        @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection")
        @groups[key.to_s] << k.to_sym
      end
    end
  end
end

#last_current_shardObject



112
113
114
# File 'lib/octopus/proxy.rb', line 112

def last_current_shard
  Thread.current["octopus.last_current_shard"]
end

#last_current_shard=(last_current_shard) ⇒ Object



116
117
118
# File 'lib/octopus/proxy.rb', line 116

def last_current_shard=(last_current_shard)
  Thread.current["octopus.last_current_shard"] = last_current_shard
end

#respond_to?(method, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


222
223
224
# File 'lib/octopus/proxy.rb', line 222

def respond_to?(method, include_private = false)
  super || select_connection.respond_to?(method, include_private)
end

#run_queries_on_shard(shard, &block) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/octopus/proxy.rb', line 167

def run_queries_on_shard(shard, &block)
  older_shard = self.current_shard
  last_block = self.block

  begin
    self.block = true
    self.current_shard = shard
    yield
  ensure
    self.block = last_block || false
    self.current_shard = older_shard
  end
end

#safe_connection(connection_pool) ⇒ Object

Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.



147
148
149
150
151
152
# File 'lib/octopus/proxy.rb', line 147

def safe_connection(connection_pool)
  if Octopus.rails31? || Octopus.rails32?
    connection_pool.automatic_reconnect ||= true
  end
  connection_pool.connection()
end

#select_connectionObject



154
155
156
157
# File 'lib/octopus/proxy.rb', line 154

def select_connection
  @shards[shard_name].verify_active_connections! if @verify_connection
  safe_connection(@shards[shard_name])
end

#send_queries_to_multiple_shards(shards, &block) ⇒ Object



181
182
183
184
185
# File 'lib/octopus/proxy.rb', line 181

def send_queries_to_multiple_shards(shards, &block)
  shards.each do |shard|
    self.run_queries_on_shard(shard, &block)
  end
end

#shard_nameObject



159
160
161
# File 'lib/octopus/proxy.rb', line 159

def shard_name
  current_shard.is_a?(Array) ? current_shard.first : current_shard
end

#shard_namesObject

Public: Retrieves names of all loaded shards.

Returns an array of shard names as symbols



131
132
133
# File 'lib/octopus/proxy.rb', line 131

def shard_names
  @shards.keys
end

#shards_for_group(group) ⇒ Object

Public: Retrieves the defined shards for a given group.

Returns an array of shard names as symbols or nil if the group is not defined.



139
140
141
# File 'lib/octopus/proxy.rb', line 139

def shards_for_group(group)
  @groups.fetch(group.to_s, nil)
end

#should_clean_table_name?Boolean

Returns:

  • (Boolean)


163
164
165
# File 'lib/octopus/proxy.rb', line 163

def should_clean_table_name?
  @adapters.size > 1
end

#transaction(options = {}, &block) ⇒ Object



199
200
201
202
203
204
205
206
207
# File 'lib/octopus/proxy.rb', line 199

def transaction(options = {}, &block)
  if @replicated && (current_model.replicated || @fully_replicated)
    self.run_queries_on_shard(:master) do
      select_connection.transaction(options, &block)
    end
  else
    select_connection.transaction(options, &block)
  end
end