Class: ActiveRecord::ConnectionAdapters::MasterSlaveAdapter

Inherits:
AbstractAdapter
  • Object
show all
Defined in:
lib/master_slave_adapter.rb

Defined Under Namespace

Classes: Clock

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from AbstractAdapter

#log_info, #orig_log_info

Constructor Details

#initialize(config, logger) ⇒ MasterSlaveAdapter

Returns a new instance of MasterSlaveAdapter.



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/master_slave_adapter.rb', line 123

def initialize(config, logger)
  super(nil, logger)

  @connections = {}
  @connections[:master] = connect(config.fetch(:master), :master)
  @connections[:slaves] = config.fetch(:slaves).map { |cfg| connect(cfg, :slave) }

  @disable_connection_test = config.delete(:disable_connection_test) == 'true'

  self.current_connection = slave_connection!
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, &blk) ⇒ Object

ok, we might have missed more



299
300
301
302
303
304
305
306
307
308
309
# File 'lib/master_slave_adapter.rb', line 299

def method_missing(name, *args, &blk)
  master_connection.send(name.to_sym, *args, &blk).tap do
    @logger.try(:warn, %Q{
      You called the unsupported method '#{name}' on #{self.class.name}.
      In order to help us improve master_slave_adapter, please report this
      to: https://github.com/soundcloud/master_slave_adapter/issues

      Thank you.
    })
  end
end

Class Method Details

.reset!Object



181
182
183
184
185
186
187
# File 'lib/master_slave_adapter.rb', line 181

def reset!
  Thread.current[:master_slave_clock]      =
  Thread.current[:master_slave_connection] =
  Thread.current[:on_commit_callbacks]     =
  Thread.current[:on_rollback_callbacks]   =
  nil
end

.with_consistency(clock, &blk) ⇒ Object



178
179
180
# File 'lib/master_slave_adapter.rb', line 178

def with_consistency(clock, &blk)
  ActiveRecord::Base.with_consistency(clock, &blk)
end

.with_master(&blk) ⇒ Object



172
173
174
# File 'lib/master_slave_adapter.rb', line 172

def with_master(&blk)
  ActiveRecord::Base.with_master(&blk)
end

.with_slave(&blk) ⇒ Object



175
176
177
# File 'lib/master_slave_adapter.rb', line 175

def with_slave(&blk)
  ActiveRecord::Base.with_slave(&blk)
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


215
216
217
218
# File 'lib/master_slave_adapter.rb', line 215

def active?
  return true if @disable_connection_test
  connections.map { |c| c.active? }.all?
end

#cache(&block) ⇒ Object



232
233
234
235
236
# File 'lib/master_slave_adapter.rb', line 232

def cache(&block)
  connections.inject(block) do |block, connection|
    lambda { connection.cache(&block) }
  end.call
end

#clear_query_cacheObject



244
245
246
# File 'lib/master_slave_adapter.rb', line 244

def clear_query_cache
  connections.each { |connection| connection.clear_query_cache }
end

#commit_db_transactionObject



204
205
206
207
# File 'lib/master_slave_adapter.rb', line 204

def commit_db_transaction
  on_write { |conn| conn.commit_db_transaction }
  on_commit_callbacks.shift.call(current_clock) until on_commit_callbacks.blank?
end

#connectionsObject



342
343
344
# File 'lib/master_slave_adapter.rb', line 342

def connections
  @connections.values.inject([]) { |m,c| m << c }.flatten.compact
end

#current_clockObject



354
355
356
# File 'lib/master_slave_adapter.rb', line 354

def current_clock
  Thread.current[:master_slave_clock]
end

#current_clock=(clock) ⇒ Object



358
359
360
# File 'lib/master_slave_adapter.rb', line 358

def current_clock=(clock)
  Thread.current[:master_slave_clock] = clock
end

#current_connectionObject



346
347
348
# File 'lib/master_slave_adapter.rb', line 346

def current_connection
  connection_stack.first
end

#current_connection=(conn) ⇒ Object



350
351
352
# File 'lib/master_slave_adapter.rb', line 350

def current_connection=(conn)
  connection_stack.unshift conn
end

#delete(*args) ⇒ Object



200
201
202
# File 'lib/master_slave_adapter.rb', line 200

def delete(*args)
  on_write { |conn| conn.delete(*args) }
end

#disconnect!Object



224
225
226
# File 'lib/master_slave_adapter.rb', line 224

def disconnect!
  connections.each { |c| c.disconnect! }
end

#execute(*args) ⇒ Object

Someone calling execute directly on the connection is likely to be a write, respectively some DDL statement. People really shouldn’t do that, but let’s delegate this to master, just to be sure.



251
252
253
# File 'lib/master_slave_adapter.rb', line 251

def execute(*args)
  on_write { |conn| conn.execute(*args) }
end

#insert(*args) ⇒ Object

ADAPTER INTERFACE OVERRIDES ===========================================



192
193
194
# File 'lib/master_slave_adapter.rb', line 192

def insert(*args)
  on_write { |conn| conn.insert(*args) }
end

#master_clockObject



362
363
364
365
366
367
# File 'lib/master_slave_adapter.rb', line 362

def master_clock
  conn = master_connection
  if status = conn.uncached { conn.select_one("SHOW MASTER STATUS") }
    Clock.new(status['File'], status['Position'])
  end
end

#master_connectionObject

UTIL ==================================================================



332
333
334
# File 'lib/master_slave_adapter.rb', line 332

def master_connection
  @connections[:master]
end

#on_commit(&blk) ⇒ Object



161
162
163
# File 'lib/master_slave_adapter.rb', line 161

def on_commit(&blk)
  on_commit_callbacks.push blk
end

#on_rollback(&blk) ⇒ Object



165
166
167
# File 'lib/master_slave_adapter.rb', line 165

def on_rollback(&blk)
  on_rollback_callbacks.push blk
end

#reconnect!Object



220
221
222
# File 'lib/master_slave_adapter.rb', line 220

def reconnect!
  connections.each { |c| c.reconnect! }
end

#reset!Object



228
229
230
# File 'lib/master_slave_adapter.rb', line 228

def reset!
  connections.each { |c| c.reset! }
end

#rollback_db_transactionObject



209
210
211
212
213
# File 'lib/master_slave_adapter.rb', line 209

def rollback_db_transaction
  on_commit_callbacks.clear
  with(master_connection) { |conn| conn.rollback_db_transaction }
  on_rollback_callbacks.shift.call until on_rollback_callbacks.blank?
end

#slave_clock(conn) ⇒ Object



369
370
371
372
373
374
375
# File 'lib/master_slave_adapter.rb', line 369

def slave_clock(conn)
  if status = conn.uncached { conn.select_one("SHOW SLAVE STATUS") }
    Clock.new(status['Relay_Master_Log_File'], status['Exec_Master_Log_Pos']).tap do |c|
      set_last_seen_slave_clock(conn, c)
    end
  end
end

#slave_connection!Object

Returns a random slave connection Note: the method is not referentially transparent, hence the bang



338
339
340
# File 'lib/master_slave_adapter.rb', line 338

def slave_connection!
  @connections[:slaves].sample
end

#slave_consistent?(conn, clock) ⇒ Boolean

Returns:

  • (Boolean)


377
378
379
380
# File 'lib/master_slave_adapter.rb', line 377

def slave_consistent?(conn, clock)
  get_last_seen_slave_clock(conn).try(:>=, clock) ||
    slave_clock(conn).try(:>=, clock)
end

#uncached(&block) ⇒ Object



238
239
240
241
242
# File 'lib/master_slave_adapter.rb', line 238

def uncached(&block)
  connections.inject(block) do |block, connection|
    lambda { connection.uncached(&block) }
  end.call
end

#update(*args) ⇒ Object



196
197
198
# File 'lib/master_slave_adapter.rb', line 196

def update(*args)
  on_write { |conn| conn.update(*args) }
end

#with_consistency(clock) ⇒ Object

Raises:

  • (ArgumentError)


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/master_slave_adapter.rb', line 145

def with_consistency(clock)
  raise ArgumentError, "consistency cannot be nil" if clock.nil?
  # try random slave, else fall back to master
  slave = slave_connection!
  conn =
    if !open_transaction? && slave_consistent?(slave, clock)
      slave
    else
      master_connection
    end

  with(conn) { yield }

  self.current_clock || clock
end

#with_masterObject

MASTER SLAVE ADAPTER INTERFACE ========================================



137
138
139
# File 'lib/master_slave_adapter.rb', line 137

def with_master
  with(master_connection) { yield }
end

#with_slaveObject



141
142
143
# File 'lib/master_slave_adapter.rb', line 141

def with_slave
  with(slave_connection!) { yield }
end