Class: Jetpants::DB

Inherits:
Object show all
Includes:
CallbackHandler
Defined in:
lib/jetpants/db.rb,
lib/jetpants/db/state.rb,
lib/jetpants/db/client.rb,
lib/jetpants/db/schema.rb,
lib/jetpants/db/server.rb,
lib/jetpants/db/privileges.rb,
lib/jetpants/db/replication.rb,
lib/jetpants/db/import_export.rb

Overview

– Import, export, and data set methods ####################################### ++

Constant Summary collapse

@@all_dbs =

We keep track of DB instances to prevent DB.new from every returning duplicates.

{}
@@all_dbs_mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from CallbackHandler

included

Constructor Details

#initialize(ip, port = 3306) ⇒ DB

Returns a new instance of DB.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/jetpants/db.rb', line 56

def initialize(ip, port=3306)
  @ip, @port = ip, port.to_i
  @host = Host.new(ip)
  
  # These get set upon DB#probe being run
  @master = nil
  @slaves = nil
  @repl_paused = nil
  @running = nil
  
  # These get set upon DB#connect being run
  @user = nil
  @schema = nil
  
  # This is ephemeral, only known to Jetpants if you previously called
  # DB#start_mysql or DB#restart_mysql in this process
  @options = []
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, &block) ⇒ Object

Jetpants::DB delegates missing methods to its Jetpants::Host.



78
79
80
81
82
83
84
# File 'lib/jetpants/db.rb', line 78

def method_missing(name, *args, &block)
  if @host.respond_to? name
    @host.send name, *args, &block
  else
    super
  end
end

Instance Attribute Details

#hostObject (readonly)

Jetpants::Host object that this MySQL instance runs on.



27
28
29
# File 'lib/jetpants/db.rb', line 27

def host
  @host
end

#ipObject (readonly)

IP address (as a string) of the MySQL instance



17
18
19
# File 'lib/jetpants/db.rb', line 17

def ip
  @ip
end

#portObject (readonly)

Port number of the MySQL instance. The base Jetpants implementation only supports port 3306, since this is necessary to crawl a replication hierarchy using SHOW PROCESSLIST, which does not include slave port numbers. However, plugins may override this behavior to support nonstandard ports and multi-instance-per-host topologies.



24
25
26
# File 'lib/jetpants/db.rb', line 24

def port
  @port
end

Class Method Details

.clearObject



34
35
36
# File 'lib/jetpants/db.rb', line 34

def self.clear
  @@all_dbs_mutex.synchronize {@@all_dbs = {}}
end

.new(ip, port = 3306) ⇒ Object

We override DB.new so that attempting to create a duplicate DB object (that is, one with the same IP and port as an existing DB object) returns the original object.



46
47
48
49
50
51
52
53
54
# File 'lib/jetpants/db.rb', line 46

def self.new(ip, port=3306)
  ip, embedded_port = ip.split(':', 2)
  port = embedded_port.to_i if embedded_port
  addr = "#{ip}:#{port}"
  @@all_dbs_mutex.synchronize do
    @@all_dbs[addr] = nil unless @@all_dbs[addr].is_a? self
    @@all_dbs[addr] ||= super
  end
end

.random_password(length = 50) ⇒ Object

Generate and return a random string consisting of uppercase letters, lowercase letters, and digits.



120
121
122
123
# File 'lib/jetpants/db/privileges.rb', line 120

def self.random_password(length=50)
  chars = [('a'..'z'), ('A'..'Z'), (0..9)].map(&:to_a).flatten
  (1..length).map{ chars[rand(chars.length)] }.join
end

Instance Method Details

#<=>(other) ⇒ Object

DB objects are sorted as strings, ie, by calling to_s



114
115
116
# File 'lib/jetpants/db.rb', line 114

def <=> other
  to_s <=> other.to_s
end

#ahead_of?(node) ⇒ Boolean

Return true if this node’s replication progress is ahead of the provided node, or false otherwise. The nodes must be in the same pool for coordinates to be comparable. Does not work in hierarchical replication scenarios!

Returns:

  • (Boolean)


287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/jetpants/db/replication.rb', line 287

def ahead_of?(node)
  my_pool = pool(true)
  raise "Node #{node} is not in the same pool as #{self}" unless node.pool(true) == my_pool
  
  my_coords   = (my_pool.master == self ? binlog_coordinates      : repl_binlog_coordinates)
  node_coords = (my_pool.master == node ? node.binlog_coordinates : node.repl_binlog_coordinates)
  
  # Same coordinates
  if my_coords == node_coords
    false
  
  # Same logfile: simply compare position
  elsif my_coords[0] == node_coords[0]
    my_coords[1] > node_coords[1]
    
  # Different logfile
  else
    my_logfile_num = my_coords[0].match(/^[a-zA-Z.0]+(\d+)$/)[1].to_i
    node_logfile_num = node_coords[0].match(/^[a-zA-Z.0]+(\d+)$/)[1].to_i
    my_logfile_num > node_logfile_num
  end
end

#alter_schemataObject

Has no built-in effect. Plugins can override this and/or use before_alter_schemata and after_alter_schemata callbacks to provide an implementation. Also sometimes useful to override this as a singleton method on specific DB objects in a migration script.



34
35
# File 'lib/jetpants/db/import_export.rb', line 34

def alter_schemata
end

#app_credentialsObject

Returns a hash containing :user and :pass indicating how the application connects to this database instance. By default this just delegates to Jetpants.application_credentials, which obtains credentials from the Jetpants config file. Plugins may override this to use different credentials for particular hosts or in certain situations.



101
102
103
# File 'lib/jetpants/db/client.rb', line 101

def app_credentials
  Jetpants.app_credentials
end

#app_schemaObject

Returns the schema name (“database name” in MySQL parlance) to use for connections. Defaults to just calling Jetpants.mysql_schema, but plugins may override.



107
108
109
# File 'lib/jetpants/db/client.rb', line 107

def app_schema
  Jetpants.mysql_schema
end

#binlog_coordinates(display_info = true) ⇒ Object

Returns a two-element array containing [log file name, position] for this database. Only useful when called on a master. This is the current instance’s own binlog coordinates, NOT the coordinates of replication progress on a slave!



190
191
192
193
194
195
# File 'lib/jetpants/db/replication.rb', line 190

def binlog_coordinates(display_info=true)
  hash = mysql_root_cmd('SHOW MASTER STATUS', :parse=>true)
  raise "Cannot obtain binlog coordinates of this master becaues binary logging is not enabled" unless hash[:file]
  output "Own binlog coordinates are (#{hash[:file]}, #{hash[:position].to_i})." if display_info
  [hash[:file], hash[:position].to_i]
end

#catch_up_to_master(timeout = 3600, threshold = 3, poll_frequency = 5) ⇒ Object

Waits for this instance’s SECONDS_BEHIND_MASTER to reach 0 and stay at 0 after repeated polls (based on threshold and poll_frequency). Will raise an exception if this has not happened within the timeout period, in seconds. In other words, with default settings: checks slave lag every 5+ sec, and returns true if slave lag is zero 3 times in a row. Gives up if this does not occur within a one-hour period. If a large amount of slave lag is reported, this method will automatically reduce its polling frequency.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/jetpants/db/replication.rb', line 212

def catch_up_to_master(timeout=3600, threshold=3, poll_frequency=5)
  raise "This instance is not a slave" unless master
  resume_replication if @repl_paused
  
  times_at_zero = 0
  start = Time.now.to_i
  output "Waiting to catch up to master"
  while (Time.now.to_i - start) < timeout
    lag = seconds_behind_master
    if lag == 0
      times_at_zero += 1
      if times_at_zero >= threshold
        output "Caught up to master."
        return true
      end
      sleep poll_frequency
    elsif lag.nil?
      resume_replication
      sleep 1
      raise "Unable to restart replication" if seconds_behind_master.nil?
    else
      output "Currently #{lag} seconds behind master."
      times_at_zero = 0
      extra_sleep_time = (lag > 30000 ? 300 : (seconds_behind_master / 100).ceil)
      sleep poll_frequency + extra_sleep_time
    end
  end
  raise "This instance did not catch up to its master within #{timeout} seconds"
end

#change_master_to(new_master, option_hash = {}) ⇒ Object

Changes the master for this instance. Supply a Jetpants::DB indicating the new master, along with options :log_pos, :log_file, :user, :password. Does NOT automatically start replication afterwards on self!

If you omit :log_pos or :log_file, uses the current position/file from new_master, though this is only safe if new_master is not receiving writes!

If you omit :user and :password, tries obtaining replication credentials from the current node (assuming it is already a slave) or if that fails then from the global settings.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/jetpants/db/replication.rb', line 18

def change_master_to(new_master, option_hash={})
  return disable_replication! unless new_master   # change_master_to(nil) alias for disable_replication!
  return if new_master == master                  # no change
  
  logfile = option_hash[:log_file]
  pos     = option_hash[:log_pos]
  if !(logfile && pos)
    raise "Cannot use coordinates of a new master that is receiving updates" if new_master.master && ! new_master.repl_paused?
    logfile, pos = new_master.binlog_coordinates
  end
  
  repl_user = option_hash[:user]     || replication_credentials[:user]
  repl_pass = option_hash[:password] || replication_credentials[:pass]

  pause_replication if @master && !@repl_paused
  result = mysql_root_cmd "CHANGE MASTER TO " +
    "MASTER_HOST='#{new_master.ip}', " +
    "MASTER_PORT=#{new_master.port}, " +
    "MASTER_LOG_FILE='#{logfile}', " +
    "MASTER_LOG_POS=#{pos}, " +
    "MASTER_USER='#{repl_user}', " + 
    "MASTER_PASSWORD='#{repl_pass}'"
  
  output "Changing master to #{new_master} with coordinates (#{logfile}, #{pos}). #{result}"
  @master.slaves.delete(self) if @master rescue nil
  @master = new_master
  @repl_paused = true
  new_master.slaves << self
end

#clone_to!(*targets) ⇒ Object

Copies mysql db files from self to one or more additional DBs. WARNING: temporarily shuts down mysql on self, and WILL OVERWRITE CONTENTS OF MYSQL DIRECTORY ON TARGETS. Confirms first that none of the targets have over 100MB of data in the schema directory or in ibdata1. MySQL is restarted on source and targets afterwards.



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/jetpants/db/import_export.rb', line 320

def clone_to!(*targets)
  targets.flatten!
  raise "Cannot clone an instance onto its master" if master && targets.include?(master)
  destinations = {}
  targets.each do |t| 
    destinations[t] = t.mysql_directory
    raise "Over 100 MB of existing MySQL data on target #{t}, aborting copy!" if t.data_set_size > 100000000
  end
  [self, targets].flatten.concurrent_each {|t| t.stop_query_killer; t.stop_mysql}
  targets.concurrent_each {|t| t.ssh_cmd "rm -rf #{t.mysql_directory}/ib_logfile*"}
  
  # Construct the list of files and dirs to copy. We include ib_lru_dump if present
  # (ie, if using Percona Server with innodb_buffer_pool_restore_at_startup enabled)
  # since this will greatly improve warm-up time of the cloned nodes
  files = ['ibdata1', 'mysql', 'test', app_schema]
  files << 'ib_lru_dump' if ssh_cmd("test -f #{mysql_directory}/ib_lru_dump 2>/dev/null; echo $?").chomp.to_i == 0
  
  fast_copy_chain(mysql_directory, 
                  destinations,
                  port: 3306,
                  files: files,
                  overwrite: true)
  [self, targets].flatten.concurrent_each {|t| t.start_mysql; t.start_query_killer}
end

#confirm_listening(timeout = 10) ⇒ Object

Confirms that a process is listening on the DB’s port



86
87
88
89
90
91
92
93
# File 'lib/jetpants/db/server.rb', line 86

def confirm_listening(timeout=10)
  if @options.include? '--skip-networking'
    output 'Unable to confirm mysqld listening because server started with --skip-networking'
    false
  else
    confirm_listening_on_port(@port, timeout)
  end
end

#connect(options = {}) ⇒ Object

Returns a Sequel database object for use in sending queries to the DB remotely. Initializes (or re-initializes) the connection pool upon first use or upon requesting a different user or schema. Note that we only maintain one connection pool per DB. Valid options include :user, :pass, :schema, :max_conns, :after_connect or omit these to use defaults.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/jetpants/db/client.rb', line 46

def connect(options={})
  if @options.include? '--skip-networking'
    output 'Skipping connection attempt because server started with --skip-networking'
    return nil
  end
  
  options[:user]    ||= app_credentials[:user]
  options[:schema]  ||= app_schema
  
  return @db if @db && @user == options[:user] && @schema == options[:schema]
  
  disconnect if @db
  
  @db = Sequel.connect(
    :adapter          =>  'mysql2',
    :host             =>  @ip,
    :port             =>  @port,
    :user             =>  options[:user],
    :password         =>  options[:pass] || app_credentials[:pass],
    :database         =>  options[:schema],
    :max_connections  =>  options[:max_conns] || Jetpants.max_concurrency,
    :after_connect    =>  options[:after_connect] )
  @user = options[:user]
  @schema = options[:schema]
  @db.convert_tinyint_to_bool = false
  @db
end

#connectionObject Also known as: mysql

Returns a Sequel database object representing the current connection. If no current connection, this will automatically connect with default options.



92
93
94
# File 'lib/jetpants/db/client.rb', line 92

def connection
  @db || connect
end

#create_user(username = false, password = false, skip_binlog = false) ⇒ Object

Create a MySQL user. If you omit parameters, the defaults from Jetpants’ configuration will be used instead. Does not automatically grant any privileges; use DB#grant_privileges for that. Intentionally cannot create a passwordless user.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/jetpants/db/privileges.rb', line 12

def create_user(username=false, password=false, skip_binlog=false)
  username ||= app_credentials[:user]
  password ||= app_credentials[:pass]
  commands = []
  commands << 'SET sql_log_bin = 0' if skip_binlog
  Jetpants.mysql_grant_ips.each do |ip|
    commands << "CREATE USER '#{username}'@'#{ip}' IDENTIFIED BY '#{password}'"
  end
  commands << "FLUSH PRIVILEGES"
  commands = commands.join '; '
  mysql_root_cmd commands, schema: true
  Jetpants.mysql_grant_ips.each do |ip|
    message = "Created user '#{username}'@'#{ip}'"
    message += ' (only on this node - skipping binlog!)' if skip_binlog
    output message
  end
end

#data_set_size(in_gb = false) ⇒ Object

Returns the data set size in bytes (if in_gb is false or omitted) or in gigabytes (if in_gb is true). Note that this is actually in gibibytes (2^30) rather than a metric gigabyte. This puts it on the same scale as the output to tools like “du -h” and “df -h”.



280
281
282
283
# File 'lib/jetpants/db/state.rb', line 280

def data_set_size(in_gb=false)
  bytes = dir_size("#{mysql_directory}/#{app_schema}") + dir_size("#{mysql_directory}/ibdata1")
  in_gb ? (bytes / 1073741824.0).round : bytes
end

#delete_table_data_outside_range(table, keep_min_id, keep_max_id, direction) ⇒ Object

Helper method used by prune_data_to_range. Deletes data for the given table that falls either below the supplied keep_min_id (if direction is :desc) or falls above the supplied keep_max_id (if direction is :asc).



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/jetpants/db/import_export.rb', line 229

def delete_table_data_outside_range(table, keep_min_id, keep_max_id, direction)
  rows_deleted = 0
  
  if direction == :asc
    dir_english = "Ascending"
    boundary = keep_max_id
    output "Removing rows with ID > #{boundary}", table
  elsif direction == :desc
    dir_english = "Descending"
    boundary = keep_min_id
    output "Removing rows with ID < #{boundary}", table
  else
    raise "Unknown order parameter #{order}"
  end
  
  table.sharding_keys.each do |col|
    deleter_sql = table.sql_cleanup_delete(col, keep_min_id, keep_max_id)
    
    id = boundary
    iter = 0
    while id do
      finder_sql = table.sql_cleanup_next_id(col, id, direction)
      id = query_return_first_value(finder_sql)
      break unless id
      rows_deleted += query(deleter_sql, id)
      
      # Slow down on multi-col sharding key tables, due to queries being far more expensive
      sleep(0.0001) if table.sharding_keys.size > 1
      
      iter += 1
      output("#{dir_english} deletion progress: through #{col} #{id}, deleted #{rows_deleted} rows so far", table) if iter % 50000 == 0
    end
  end
  rows_deleted
end

#detect_table_schema(table_name) ⇒ Object

Query the database for information about the table schema, including primary key, create statement, and columns



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/jetpants/db/schema.rb', line 12

def detect_table_schema(table_name)
  table_sql = "SHOW CREATE TABLE `#{table_name}`"
  create_statement = query_return_first(table_sql).values.last
  pk_sql = "SHOW INDEX IN #{table_name} WHERE Key_name = 'PRIMARY'"
  pk_fields = query_return_array(pk_sql)
  pk_fields.sort_by!{|pk| pk[:Seq_in_index]}

  params = {
    'primary_key' => pk_fields.map{|pk| pk[:Column_name] },
    'create_table' => create_statement,
    'indexes' => connection.indexes(table_name),
    'pool' => pool,
    'columns' => connection.schema(table_name).map{|schema| schema[0]} 
  }

  Table.new(table_name, params)
end

#disable_binary_loggingObject

This method is no longer supported. It used to manipulate /etc/my.cnf directly, which was too brittle. You can achieve the same effect by passing parameters to DB#restart_mysql.



274
275
276
# File 'lib/jetpants/db/replication.rb', line 274

def disable_binary_logging
  raise "DB#disable_binary_logging is no longer supported, please use DB#restart_mysql('--skip-log-bin', '--skip-log-slave-updates') instead"
end

#disable_monitoring(*services) ⇒ Object

Has no built-in effect. Plugins can override it, and/or implement before_disable_monitoring and after_disable_monitoring callbacks.



108
109
# File 'lib/jetpants/db/server.rb', line 108

def disable_monitoring(*services)
end

#disable_read_only!Object

Disables global read-only mode on the database.



107
108
109
110
111
112
113
114
115
116
# File 'lib/jetpants/db/privileges.rb', line 107

def disable_read_only!
  if read_only?
    output "Disabling read_only mode"
    mysql_root_cmd 'SET GLOBAL read_only = 0'
    not read_only?
  else
    output "Confirmed that read_only mode is already disabled"
    true
  end
end

#disable_replication!Object Also known as: reset_replication!

Permanently disables replication. Clears out the SHOW SLAVE STATUS output entirely in MySQL versions that permit this.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/jetpants/db/replication.rb', line 93

def disable_replication!
  stop_replication
  output "Disabling replication; this db is no longer a slave."
  ver = version_tuple
  
  # MySQL < 5.5: allows master_host='', which clears out SHOW SLAVE STATUS
  if ver[0] == 5 && ver[1] < 5
    output mysql_root_cmd "CHANGE MASTER TO master_host=''; RESET SLAVE"
  
  # MySQL 5.5.16+: allows RESET SLAVE ALL, which clears out SHOW SLAVE STATUS
  elsif ver[0] >= 5 && (ver[0] > 5 || ver[1] >= 5) && (ver[0] > 5 || ver[1] > 5 || ver[2] >= 16)
    output mysql_root_cmd "CHANGE MASTER TO master_user='test'; RESET SLAVE ALL"
  
  # Other versions: no safe way to clear out SHOW SLAVE STATUS.  Still set master_user to 'test'
  # so that we know to ignore the slave status output.
  else
    output mysql_root_cmd "CHANGE MASTER TO master_user='test'; RESET SLAVE"
  end
  
  @master.slaves.delete(self) rescue nil
  @master = nil
  @repl_paused = nil
end

#disconnectObject

Closes the database connection(s) in the connection pool.



75
76
77
78
79
80
81
82
# File 'lib/jetpants/db/client.rb', line 75

def disconnect
  if @db
    @db.disconnect rescue nil
    @db = nil
  end
  @user = nil
  @schema = nil
end

#drop_user(username = false, skip_binlog = false) ⇒ Object

Drops a user. Can optionally make this statement skip replication, if you want to drop a user on master and not on its slaves.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/jetpants/db/privileges.rb', line 32

def drop_user(username=false, skip_binlog=false)
  username ||= app_credentials[:user]
  commands = []
  commands << 'SET sql_log_bin = 0' if skip_binlog
  Jetpants.mysql_grant_ips.each do |ip|
    commands << "DROP USER '#{username}'@'#{ip}'"
  end
  commands << "FLUSH PRIVILEGES"
  commands = commands.join '; '
  mysql_root_cmd commands, schema: true
  Jetpants.mysql_grant_ips.each do |ip|
    message = "Dropped user '#{username}'@'#{ip}'"
    message += ' (only on this node - skipping binlog!)' if skip_binlog
    output message
  end
end

#enable_binary_loggingObject

This method is no longer supported. It used to manipulate /etc/my.cnf directly, which was too brittle. You can achieve the same effect by passing (or NOT passing) parameters to DB#restart_mysql.



280
281
282
# File 'lib/jetpants/db/replication.rb', line 280

def enable_binary_logging
  raise "DB#enable_binary_logging is no longer supported, please use DB#restart_mysql() instead"
end

#enable_monitoring(*services) ⇒ Object

Has no built-in effect. Plugins can override it, and/or implement before_enable_monitoring and after_enable_monitoring callbacks.



103
104
# File 'lib/jetpants/db/server.rb', line 103

def enable_monitoring(*services)
end

#enable_read_only!Object

Enables global read-only mode on the database.



95
96
97
98
99
100
101
102
103
104
# File 'lib/jetpants/db/privileges.rb', line 95

def enable_read_only!
  if read_only?
    output "Node already has read_only mode enabled"
    true
  else
    output "Enabling read_only mode"
    mysql_root_cmd 'SET GLOBAL read_only = 1'
    read_only?
  end
end

#enslave!(targets, repl_user = false, repl_pass = false) ⇒ Object

Wipes out the target instances and turns them into slaves of self. Resumes replication on self afterwards, but does NOT automatically start replication on the targets. You can omit passing in the replication user/pass if this machine is itself a slave OR already has at least one slave OR the global setting is fine to use here. Warning: takes self offline during the process, so don’t use on a master that is actively in use by your application!



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/jetpants/db/replication.rb', line 125

def enslave!(targets, repl_user=false, repl_pass=false)
  repl_user ||= replication_credentials[:user]
  repl_pass ||= replication_credentials[:pass]
  disable_monitoring
  targets.each {|t| t.disable_monitoring}
  pause_replication if master && ! @repl_paused
  file, pos = binlog_coordinates
  clone_to!(targets)
  targets.each do |t|
    t.enable_monitoring
    t.change_master_to( self, 
                        log_file: file, 
                        log_pos:  pos, 
                        user:     repl_user, 
                        password: repl_pass  )
    t.enable_read_only!
  end
  resume_replication if @master # should already have happened from the clone_to! restart anyway, but just to be explicit
  enable_monitoring
end

#enslave_sibling!(target) ⇒ Object

Shortcut to call DB#enslave_siblings! on a single target



172
173
174
# File 'lib/jetpants/db/replication.rb', line 172

def enslave_sibling!(target)
  enslave_siblings!([target])
end

#enslave_siblings!(targets) ⇒ Object

Wipes out the target instances and turns them into slaves of self’s master. Resumes replication on self afterwards, but does NOT automatically start replication on the targets. Warning: takes self offline during the process, so don’t use on an active slave!



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/jetpants/db/replication.rb', line 150

def enslave_siblings!(targets)
  raise "Can only call enslave_siblings! on a slave instance" unless master
  disable_monitoring
  targets.each {|t| t.disable_monitoring}
  pause_replication unless @repl_paused
  file, pos = repl_binlog_coordinates
  clone_to!(targets)
  targets.each do |t| 
    t.enable_monitoring
    t.change_master_to( master, 
                        log_file: file,
                        log_pos:  pos,
                        user:     replication_credentials[:user],
                        password: replication_credentials[:pass]  )
    t.enable_read_only!
  end
  resume_replication # should already have happened from the clone_to! restart anyway, but just to be explicit
  catch_up_to_master
  enable_monitoring
end

#export_data(tables, min_id = false, max_id = false) ⇒ Object

Exports data for the supplied tables. If min/max ID supplied, only exports data where at least one of the table’s sharding keys falls within this range. Creates a ‘jetpants’ db user with FILE permissions for the duration of the export.



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/jetpants/db/import_export.rb', line 41

def export_data(tables, min_id=false, max_id=false)
  pause_replication if @master && ! @repl_paused
  import_export_user = 'jetpants'
  create_user(import_export_user)
  grant_privileges(import_export_user)               # standard privs
  grant_privileges(import_export_user, '*', 'FILE')  # FILE global privs
  reconnect(user: import_export_user)
  @counts ||= {}
  tables.each {|t| @counts[t.name] = export_table_data t, min_id, max_id}
ensure
  reconnect(user: app_credentials[:user])
  drop_user import_export_user
end

#export_schemata(tables) ⇒ Object

Exports the DROP TABLE + CREATE TABLE statements for the given tables via mysqldump



9
10
11
12
13
14
15
16
17
# File 'lib/jetpants/db/import_export.rb', line 9

def export_schemata(tables)
  output 'Exporting table definitions'
  supply_root_pw = (Jetpants.mysql_root_password ? "-p#{Jetpants.mysql_root_password}" : '')
  supply_port = (@port == 3306 ? '' : "-h 127.0.0.1 -P #{@port}")
  cmd = "mysqldump #{supply_root_pw} #{supply_port} -d #{app_schema} " + tables.join(' ') + " >#{Jetpants.export_location}/create_tables_#{@port}.sql"
  cmd.untaint
  result = ssh_cmd(cmd)
  output result
end

#export_table_data(table, min_id = false, max_id = false) ⇒ Object

Exports data for a table. Only includes the data subset that falls within min_id and max_id. The export files will be located according to the export_location configuration setting. Returns the number of rows exported.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/jetpants/db/import_export.rb', line 59

def export_table_data(table, min_id=false, max_id=false)
  unless min_id && max_id && table.chunks > 0
    output "Exporting all data", table
    rows_exported = query(table.sql_export_all)
    output "#{rows_exported} rows exported", table
    return rows_exported
  end
  
  output "Exporting data for ID range #{min_id}..#{max_id}", table
  lock = Mutex.new
  rows_exported = 0
  chunks_completed = 0
  
  (min_id..max_id).in_chunks(table.chunks) do |min, max|
    attempts = 0
    begin
      sql = table.sql_export_range(min, max)
      result = query sql
      lock.synchronize do
        rows_exported += result
        chunks_completed += 1
        percent_finished = 100 * chunks_completed / table.chunks
        output("Export #{percent_finished}% complete.", table) if table.chunks >= 40 && chunks_completed % 20 == 0
      end
    rescue => ex
      if attempts >= 10
        output "EXPORT ERROR: #{ex.message}, chunk #{min}-#{max}, giving up", table
        raise
      end
      attempts += 1
      output "EXPORT ERROR: #{ex.message}, chunk #{min}-#{max}, attempt #{attempts}, re-trying after delay", table
      ssh_cmd("rm -f " + table.export_file_path(min, max))
      sleep(1.0 * attempts)
      retry
    end
  end
  output "#{rows_exported} rows exported", table
  rows_exported
end

#for_backups?Boolean

Jetpants supports a notion of dedicated backup machines, containing one or more MySQL instances that are considered “backup slaves”, which will never be promoted to serve production queries. The default implementation identifies these by a hostname beginning with “backup”. You may want to override this with a plugin to use a different scheme if your architecture contains a similar type of node.

Returns:

  • (Boolean)


155
156
157
# File 'lib/jetpants/db/state.rb', line 155

def for_backups?
  @host.hostname.start_with? 'backup'
end

#global_statusObject

Returns a hash mapping global MySQL status fields (as symbols) to their values (as strings).



191
192
193
194
195
196
# File 'lib/jetpants/db/state.rb', line 191

def global_status
  query_return_array('show global status').reduce do |variables, variable|
    variables[variable[:Variable_name].to_sym] = variable[:Value]
    variables      
  end
end

#global_variablesObject

Returns a hash mapping global MySQL variables (as symbols) to their values (as strings).



182
183
184
185
186
187
# File 'lib/jetpants/db/state.rb', line 182

def global_variables
  query_return_array('show global variables').reduce do |variables, variable|
    variables[variable[:Variable_name].to_sym] = variable[:Value]
    variables
  end
end

#grant_or_revoke_privileges(statement, username, database, privileges) ⇒ Object

Helper method that can do grants or revokes.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/jetpants/db/privileges.rb', line 64

def grant_or_revoke_privileges(statement, username, database, privileges)
  preposition = (statement.downcase == 'revoke' ? 'FROM' : 'TO')
  username ||= app_credentials[:user]
  database ||= app_schema
  privileges = Jetpants.mysql_grant_privs if privileges.empty?
  privileges = privileges.join(',')
  commands = []
  
  Jetpants.mysql_grant_ips.each do |ip|
    commands << "#{statement} #{privileges} ON #{database}.* #{preposition} '#{username}'@'#{ip}'"
  end
  commands << "FLUSH PRIVILEGES"
  commands = commands.join '; '
  mysql_root_cmd commands, schema: true
  Jetpants.mysql_grant_ips.each do |ip|
    verb = (statement.downcase == 'revoke' ? 'Revoking' : 'Granting')
    target_db = (database == '*' ? 'globally' : "on #{database}.*")
    output "#{verb} privileges #{preposition.downcase} '#{username}'@'#{ip}' #{target_db}: #{privileges.downcase}"
  end
end

#grant_privileges(username = false, database = false, *privileges) ⇒ Object

Grants privileges to the given username for the specified database. Pass in privileges as additional params, each as strings. You may omit parameters to use the defaults in the Jetpants config file.



52
53
54
# File 'lib/jetpants/db/privileges.rb', line 52

def grant_privileges(username=false, database=false, *privileges)
  grant_or_revoke_privileges('GRANT', username, database, privileges)
end

#has_slaves?Boolean

Returns true if this instance had at least one slave when it was last probed, false otherwise. (This method will indirectly force a probe if the instance hasn’t been probed before.)

Returns:

  • (Boolean)


87
88
89
# File 'lib/jetpants/db/state.rb', line 87

def has_slaves?
  slaves.count > 0
end

#has_table?(table) ⇒ Boolean

Deletages check for a table existing by name up to the pool

Returns:

  • (Boolean)


31
32
33
# File 'lib/jetpants/db/schema.rb', line 31

def has_table?(table)
  pool.has_table? table
end

#import_data(tables, min_id = false, max_id = false) ⇒ Object

Imports data for a table that was previously exported using export_data. Only includes the data subset that falls within min_id and max_id. If run after export_data (in the same process), import_data will automatically confirm that the import counts match the previous export counts.

Creates a ‘jetpants’ db user with FILE permissions for the duration of the import.

Note: import will be substantially faster if you disable binary logging before the import, and re-enable it after the import. You also must set InnoDB’s autoinc lock mode to 2 in order to do a chunked import with auto-increment tables. You can achieve all this by calling DB#restart_mysql ‘–skip-log-bin’, ‘–skip-log-slave-updates’, ‘–innodb-autoinc-lock-mode=2’ prior to importing data, and then clear those settings by calling DB#restart_mysql with no params after done importing data.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/jetpants/db/import_export.rb', line 115

def import_data(tables, min_id=false, max_id=false)
  disable_read_only!
  import_export_user = 'jetpants'
  create_user(import_export_user)
  grant_privileges(import_export_user)               # standard privs
  grant_privileges(import_export_user, '*', 'FILE')  # FILE global privs
  
  # Disable unique checks upon connecting. This has to be done at the :after_connect level in Sequel
  # to guarantee it's being run on every connection in the conn pool. This is mysql2-specific.
  disable_unique_checks_proc = Proc.new {|mysql2_client| mysql2_client.query 'SET unique_checks = 0'}
  
  reconnect(user: import_export_user, after_connect: disable_unique_checks_proc)
  
  import_counts = {}
  tables.each {|t| import_counts[t.name] = import_table_data t, min_id, max_id}
  
  # Verify counts
  @counts ||= {}
  @counts.each do |name, exported|
    if exported == import_counts[name]
      output "Verified import count matches export count for table #{name}"
    else
      raise "Import count (#{import_counts[name]}) does not match export count (#{exported}) for table #{name}"
    end
  end
  
ensure
  reconnect(user: app_credentials[:user])
  drop_user(import_export_user)
end

#import_schemata!Object

Executes a .sql file previously created via export_schemata. Warning: this will DESTROY AND RECREATE any tables contained in the file. DO NOT USE ON A DATABASE THAT CONTAINS REAL DATA!!! This method doesn’t check first! The statements will replicate to any slaves! PROCEED WITH CAUTION IF RUNNING THIS MANUALLY!



24
25
26
27
28
# File 'lib/jetpants/db/import_export.rb', line 24

def import_schemata!
  output 'Dropping and re-creating table definitions'
  result = mysql_root_cmd "source #{Jetpants.export_location}/create_tables_#{@port}.sql", terminator: '', schema: true
  output result
end

#import_table_data(table, min_id = false, max_id = false) ⇒ Object

Imports the data subset previously dumped thorugh export_data. Returns number of rows imported.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/jetpants/db/import_export.rb', line 148

def import_table_data(table, min_id=false, max_id=false)
  unless min_id && max_id && table.chunks > 0
    output "Importing all data", table
    rows_imported = query(table.sql_import_all)
    output "#{rows_imported} rows imported", table
    return rows_imported
  end
  
  output "Importing data for ID range #{min_id}..#{max_id}", table
  lock = Mutex.new
  rows_imported = 0
  chunks_completed = 0
  
  (min_id..max_id).in_chunks(table.chunks) do |min, max|
    attempts = 0
    begin
      sql = table.sql_import_range(min, max)
      result = query sql
      lock.synchronize do
        rows_imported += result
        chunks_completed += 1
        percent_finished = 100 * chunks_completed / table.chunks
        output("Import #{percent_finished}% complete.", table) if table.chunks >= 40 && chunks_completed % 20 == 0
        chunk_file_name = table.export_file_path(min, max)
        ssh_cmd "rm -f #{chunk_file_name}"
      end
    rescue => ex
      if attempts >= 10
        output "IMPORT ERROR: #{ex.message}, chunk #{min}-#{max}, giving up", table
        raise
      end
      attempts += 1
      output "IMPORT ERROR: #{ex.message}, chunk #{min}-#{max}, attempt #{attempts}, re-trying after delay", table
      sleep(3.0 * attempts)
      retry
    end
  end
  output "#{rows_imported} rows imported", table
  rows_imported
end

#is_slave?Boolean

Returns true if this instance has a master, false otherwise.

Returns:

  • (Boolean)


80
81
82
# File 'lib/jetpants/db/state.rb', line 80

def is_slave?
  !!master
end

#is_standby?Boolean

Returns true if this instance appears to be a standby slave, false otherwise. Note that “standby” in this case is based on whether the slave is actively receiving connections, not based on any Pool’s understanding of the slave’s state. An asset- tracker plugin may want to override this to determine standby status differently.

Returns:

  • (Boolean)


145
146
147
# File 'lib/jetpants/db/state.rb', line 145

def is_standby?
  !(running?) || (is_slave? && !taking_connections?)
end

#masterObject

Returns the Jetpants::DB instance that is the master of this instance, or false if there isn’t one, or nil if we can’t tell because this instance isn’t running.



10
11
12
13
14
# File 'lib/jetpants/db/state.rb', line 10

def master
  return nil unless running? || @master
  probe if @master.nil?
  @master
end

#max_threads_running(tries = 8, interval = 1.0) ⇒ Object

Gets the max theads connected over a time period



110
111
112
# File 'lib/jetpants/db/state.rb', line 110

def max_threads_running(tries=8, interval=1.0)
  poll_status_value(:Threads_running,:max, tries, interval)
end

#mount_stats(mount = false) ⇒ Object



285
286
287
288
289
# File 'lib/jetpants/db/state.rb', line 285

def mount_stats(mount=false)
  mount ||= mysql_directory

  host.mount_stats(mount)
end

#mysql_directoryObject

Returns the MySQL data directory for this instance. A plugin can override this if needed, especially if running multiple MySQL instances on the same host.



97
98
99
# File 'lib/jetpants/db/server.rb', line 97

def mysql_directory
  '/var/lib/mysql'
end

#mysql_root_cmd(cmd, options = {}) ⇒ Object

Runs the provided SQL statement as root, locally via an SSH command line, and returns the response as a single string. Available options:

:terminator

how to terminate the query, such as ‘G’ or ‘;’. (default: ‘G’)

:parse

parse a single-row, vertical-format result (:terminator must be ‘G’) and return it as a hash

:schema

name of schema to use, or true to use this DB’s default. This may have implications when used with filtered replication! (default: nil, meaning no schema)

:attempts

by default, queries will be attempted up to 3 times. set this to 0 or false for non-idempotent queries.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/jetpants/db/client.rb', line 15

def mysql_root_cmd(cmd, options={})
  terminator = options[:terminator] || '\G'
  attempts = (options[:attempts].nil? ? 3 : (options[:attempts].to_i || 1))
  schema = (options[:schema] == true ? app_schema : options[:schema])
  failures = 0
  
  begin
    raise "MySQL is not running" unless running?
    supply_root_pw = (Jetpants.mysql_root_password ? "-p#{Jetpants.mysql_root_password}" : '')
    supply_port = (@port == 3306 ? '' : "-h 127.0.0.1 -P #{@port}")
    real_cmd = %Q{mysql #{supply_root_pw} #{supply_port} -ss -e "#{cmd}#{terminator}" #{schema}}
    real_cmd.untaint
    result = ssh_cmd!(real_cmd)
    raise result if result && result.downcase.start_with?('error ')
    result = parse_vertical_result(result) if options[:parse] && terminator == '\G'
    return result
  rescue => ex
    failures += 1
    raise if failures >= attempts
    output "Root query \"#{cmd}\" failed: #{ex.message}, re-trying after delay"
    sleep 3 * failures
    retry
  end
end

#normalized_version(precision = 2) ⇒ Object

Return a string representing the version. The precision indicates how many major/minor version numbers to return. ie, on 5.5.29, normalized_version(3) returns ‘5.5.29’, normalized_version(2) returns ‘5.5’, and normalized_version(1) returns ‘5’



220
221
222
223
# File 'lib/jetpants/db/state.rb', line 220

def normalized_version(precision=2)
  raise "Invalid precision #{precision}" if precision < 1 || precision > 3
  version_tuple[0, precision].join('.')
end

#output(str, table = nil) ⇒ Object

Displays the provided output, along with information about the current time, self, and optionally a Jetpants::Table name.



102
103
104
105
106
107
108
109
110
111
# File 'lib/jetpants/db.rb', line 102

def output(str, table=nil)
  str = str.to_s.strip
  str = nil if str && str.length == 0
  str ||= "Completed (no output)"
  output = Time.now.strftime("%H:%M:%S") + " [#{self}] "
  output << table.name << ': ' if table
  output << str
  print output + "\n"
  output
end

#override_mysql_grant_ips(ips) ⇒ Object

override Jetpants.mysql_grant_ips temporarily before executing a block then set Jetpants.mysql_grant_ips back to the original values

eg. master.override_mysql_grant_ips(['10.10.10.10']) do
      #something
    end


130
131
132
133
134
135
136
137
138
139
140
# File 'lib/jetpants/db/privileges.rb', line 130

def override_mysql_grant_ips(ips)
  ip_holder = Jetpants.mysql_grant_ips
  Jetpants.mysql_grant_ips = ips
  begin
    yield
  rescue StandardError, Interrupt, IOError
    Jetpants.mysql_grant_ips = ip_holder
    raise
  end
  Jetpants.mysql_grant_ips = ip_holder
end

#parse_vertical_result(text) ⇒ Object

Parses the result of a MySQL query run with a G terminator. Useful when interacting with MySQL via the command-line client (for secure access to the root user) instead of via the MySQL protocol.



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/jetpants/db/client.rb', line 137

def parse_vertical_result(text)
  results = {}
  return results unless text
  raise text.chomp if text =~ /^ERROR/
  lines = text.split("\n")
  lines.each do |line|
    col, val = line.split ':'
    next unless val
    results[col.strip.downcase.to_sym] = val.strip
  end
  results
end

#pause_replicationObject Also known as: stop_replication

Pauses replication



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/jetpants/db/replication.rb', line 49

def pause_replication
  raise "This DB object has no master" unless master
  output "Pausing replication from #{@master}."
  if @repl_paused
    output "Replication was already paused."
    repl_binlog_coordinates(true)
  else
    output mysql_root_cmd "STOP SLAVE"
    repl_binlog_coordinates(true)
    @repl_paused = true
  end
end

#pause_replication_with(sibling) ⇒ Object

Stops replication at the same coordinates on two nodes



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/jetpants/db/replication.rb', line 74

def pause_replication_with(sibling)
  [self, sibling].each &:pause_replication
  
  # self and sibling at same coordinates: all done
  return true if repl_binlog_coordinates == sibling.repl_binlog_coordinates
  
  # self ahead of sibling: handle via recursion with roles swapped
  return sibling.pause_replication_with(self) if ahead_of? sibling
  
  # sibling ahead of self: catch up to sibling
  sibling_coords = sibling.repl_binlog_coordinates
  output "Resuming replication from #{@master} until (#{sibling_coords[0]}, #{sibling_coords[1]})."
  output(mysql_root_cmd "START SLAVE UNTIL MASTER_LOG_FILE = '#{sibling_coords[0]}', MASTER_LOG_POS = #{sibling_coords[1]}")
  sleep 1 while repl_binlog_coordinates != sibling_coords
  true
end

#poll_status_value(field, type = :max, tries = 8, interval = 1.0) ⇒ Object

Gets the max or avg for a mysql value



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/jetpants/db/state.rb', line 115

def poll_status_value(field, type=:max, tries=8, interval=1.0)
  max = 0
  sum = 0
  tries.times do
    value = global_status[field].to_i
    max = value unless max > value
    sum += value
    sleep(interval)
  end
  if type == :max
    max
  elsif type == :avg
    sum.to_f/tries.to_f
  end
end

#pool(create_if_missing = false) ⇒ Object

Returns the Jetpants::Pool that this instance belongs to, if any. Can optionally create an anonymous pool if no pool was found. This anonymous pool intentionally has a blank sync_configuration implementation.



241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/jetpants/db/state.rb', line 241

def pool(create_if_missing=false)
  result = Jetpants.topology.pool(self)
  if !result && master
    result ||= Jetpants.topology.pool(master)
  end
  if !result && create_if_missing
    pool_master = master || self
    result = Pool.new('anon_pool_' + pool_master.ip.tr('.', ''), pool_master)
    def result.sync_configuration; end
  end
  return result
end

#probe(force = false) ⇒ Object

Probes this instance to discover its status, master, and slaves. Several other methods trigger a probe automatically, including master, slaves, repl_paused?, and running?. Ordinarily this method won’t re-probe an instance that has already been probed, unless you pass force=true. This can be useful if something external to Jetpants has changed a DB’s state while Jetpants is running. For example, if you’re using jetpants console and, for whatever reason, you stop replication on a slave manually outside of Jetpants. In this case you will need to force a probe so that Jetpants learns about the change.



58
59
60
61
62
63
64
65
# File 'lib/jetpants/db/state.rb', line 58

def probe(force=false)
  return if probed? && !force
  output "Probing MySQL installation"
  probe_running
  probe_master
  probe_slaves
  self
end

#probe!Object

Alias for probe(true)



68
# File 'lib/jetpants/db/state.rb', line 68

def probe!() probe(true) end

#probed?Boolean

Returns true if we’ve probed this MySQL instance already. Several methods trigger a probe, including master, slaves, repl_paused?, and running?.

Returns:

  • (Boolean)


44
45
46
# File 'lib/jetpants/db/state.rb', line 44

def probed?
  [@master, @slaves, @running].compact.count >= 3
end

#promotable_to_master?(detect_version_mismatches = true) ⇒ Boolean

Returns true if the node can be promoted to be the master of its pool, false otherwise (also false if node is ALREADY the master) Don’t use this in hierarchical replication scenarios, result may be unexpected.

Returns:

  • (Boolean)


163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/jetpants/db/state.rb', line 163

def promotable_to_master?(detect_version_mismatches=true)
  # backup_slaves are non-promotable
  return false if for_backups?
  
  # already the master
  p = pool(true)
  return false if p.master == self
  
  # ordinarily, cannot promote a slave that's running a higher version of
  # MySQL than any other node in the pool.
  if detect_version_mismatches
    p.nodes.all? {|db| db == self || !db.available? || db.version_cmp(self) >= 0}
  else
    true
  end
end

#prune_data_to_range(tables, keep_min_id, keep_max_id) ⇒ Object

Cleans up all rows that should no longer be on this db. Supply the ID range (in terms of the table’s sharding key) of rows to KEEP.



216
217
218
219
220
221
222
223
224
# File 'lib/jetpants/db/import_export.rb', line 216

def prune_data_to_range(tables, keep_min_id, keep_max_id)
  reconnect(user: app_credentials[:user])
  tables.each do |t|
    output "Cleaning up data, pruning to only keep range #{keep_min_id}-#{keep_max_id}", t
    rows_deleted = 0
    [:asc, :desc].each {|direction| rows_deleted += delete_table_data_outside_range(t, keep_min_id, keep_max_id, direction)}
    output "Done cleanup; #{rows_deleted} rows deleted", t
  end
end

#query(sql, *binds) ⇒ Object

Execute a write (INSERT, UPDATE, DELETE, REPLACE, etc) query. If the query is an INSERT, returns the last insert ID (if an auto_increment column is involved). Otherwise returns the number of affected rows.



114
115
116
117
# File 'lib/jetpants/db/client.rb', line 114

def query(sql, *binds)
  ds = connection.fetch(sql, *binds)
  connection.execute_dui(ds.update_sql) {|c| return c.last_id > 0 ? c.last_id : c.affected_rows}
end

#query_return_array(sql, *binds) ⇒ Object

Execute a read (SELECT) query. Returns an array of hashes.



120
121
122
# File 'lib/jetpants/db/client.rb', line 120

def query_return_array(sql, *binds)
  connection.fetch(sql, *binds).all
end

#query_return_first(sql, *binds) ⇒ Object

Execute a read (SELECT) query. Returns a hash of the first row only.



125
126
127
# File 'lib/jetpants/db/client.rb', line 125

def query_return_first(sql, *binds)
  connection.fetch(sql, *binds).first
end

#query_return_first_value(sql, *binds) ⇒ Object

Execute a read (SELECT) query. Returns the value of the first column of the first row only.



130
131
132
# File 'lib/jetpants/db/client.rb', line 130

def query_return_first_value(sql, *binds)
  connection.fetch(sql, *binds).single_value
end

#read_only?Boolean

Returns true if the global READ_ONLY variable is set, false otherwise.

Returns:

  • (Boolean)


92
93
94
# File 'lib/jetpants/db/state.rb', line 92

def read_only?
  global_variables[:read_only].downcase == 'on'
end

#rebuild!(tables = false, min_id = false, max_id = false) ⇒ Object

Exports and re-imports data for the specified tables, optionally bounded by the given range. Useful for defragmenting a node. Also useful for doing fast schema alterations, if alter_schemata (or its callbacks) has been implemented.

You can omit all params for a shard, in which case the method will use the list of sharded tables in the Jetpants config file, and will use the shard’s min and max ID.



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/jetpants/db/import_export.rb', line 272

def rebuild!(tables=false, min_id=false, max_id=false)
  raise "Cannot rebuild an active node" unless is_standby? || for_backups?
  
  p = pool
  if p.is_a?(Shard)
    tables ||= Table.from_config 'sharded_tables'
    min_id ||= p.min_id
    max_id ||= p.max_id if p.max_id != 'INFINITY'
  end
  raise "No tables supplied" unless tables && tables.count > 0
  
  disable_monitoring
  stop_query_killer
  restart_mysql '--skip-log-bin', '--skip-log-slave-updates', '--innodb-autoinc-lock-mode=2', '--skip-slave-start'
  
  # Automatically detect missing min/max. Assumes that all tables' primary keys
  # are on the same scale, so this may be non-ideal, but better than just erroring.
  unless min_id
    tables.each do |t|
      my_min = query_return_first_value "SELECT MIN(#{t.sharding_keys[0]}) FROM #{t.name}"
      min_id = my_min if !min_id || my_min < min_id
    end
  end
  unless max_id
    @found_max_ids = {} # we store the detected maxes in case DB#alter_schemata needs them later
    tables.each do |t|
      my_max = @found_max_ids[t.name] = query_return_first_value("SELECT MAX(#{t.sharding_keys[0]}) FROM #{t.name}")
      max_id = my_max if !max_id || my_max > max_id
    end
  end
  
  export_schemata tables
  export_data tables, min_id, max_id
  import_schemata!
  alter_schemata if respond_to? :alter_schemata
  import_data tables, min_id, max_id
  
  restart_mysql
  catch_up_to_master if is_slave?
  start_query_killer
  enable_monitoring
end

#reconnect(options = {}) ⇒ Object

Disconnects and reconnects to the database.



85
86
87
88
# File 'lib/jetpants/db/client.rb', line 85

def reconnect(options={})
  disconnect # force disconnection even if we're not changing user or schema
  connect(options)
end

#repl_binlog_coordinates(display_info = true) ⇒ Object

Use this on a slave to return [master log file name, position] for how far this slave has executed (in terms of its master’s binlogs) in its SQL replication thread.



178
179
180
181
182
183
184
# File 'lib/jetpants/db/replication.rb', line 178

def repl_binlog_coordinates(display_info=true)
  raise "This instance is not a slave" unless master
  status = slave_status
  file, pos = status[:relay_master_log_file], status[:exec_master_log_pos].to_i
  output "Has executed through master's binlog coordinates of (#{file}, #{pos})." if display_info
  [file, pos]
end

#repl_paused?Boolean

Returns true if replication is paused on this instance, false if it isn’t, or nil if this instance isn’t a slave (or if we can’t tell because the instance isn’t running)

Returns:

  • (Boolean)


27
28
29
30
31
# File 'lib/jetpants/db/state.rb', line 27

def repl_paused?
  return nil unless master
  probe if @repl_paused.nil?
  @repl_paused
end

#replicating?Boolean

Returns true if the MySQL slave I/O thread and slave SQL thread are both running, false otherwise. Note that this always checks the current actual state of the instance, as opposed to DB#repl_paused? which just remembers the state from the previous probe and any actions since then.

Returns:

  • (Boolean)


74
75
76
77
# File 'lib/jetpants/db/state.rb', line 74

def replicating?
  status = slave_status
  [status[:slave_io_running], status[:slave_sql_running]].all? {|s| s && s.downcase == 'yes'}
end

#replication_credentialsObject

Reads an existing master.info file on this db or one of its slaves, propagates the info back to the Jetpants singleton, and returns it as a hash containing :user and :pass. If the node is not a slave and has no slaves, will use the global Jetpants config instead.



259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/jetpants/db/replication.rb', line 259

def replication_credentials
  user = false
  pass = false
  if master || slaves.count > 0
    target = (@master ? self : @slaves[0])
    results = target.ssh_cmd("cat #{mysql_directory}/master.info | head -6 | tail -2").split
    if results.count == 2 && results[0] != 'test'
      user, pass = results
    end
  end
  user && pass ? {user: user, pass: pass} : Jetpants.replication_credentials
end

#respond_to?(name, include_private = false) ⇒ Boolean

Alters respond_to? logic to account for delegation of missing methods to the instance’s Host.

Returns:

  • (Boolean)


88
89
90
# File 'lib/jetpants/db.rb', line 88

def respond_to?(name, include_private=false)
  super || @host.respond_to?(name)
end

#restart_mysql(*options) ⇒ Object

Restarts MySQL.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/jetpants/db/server.rb', line 46

def restart_mysql(*options)
  if @master
    @repl_paused = options.include?('--skip-slave-start')
  end
  
  # Disconnect if we were previously connected
  user, schema = false, false
  if @db
    user, schema = @user, @schema
    disconnect
  end
  
  if options.size == 0
    output "Attempting to restart MySQL, no option overrides supplied"
  else
    output "Attempting to restart MySQL with options #{options.join(' ')}"
  end
  output service(:restart, 'mysql', options.join(' '))
  @options = options
  confirm_listening
  @running = true
  unless @options.include?('--skip-networking')
    disable_read_only! if role == :master
    
    # Reconnect if we were previously connected
    connect(user: user, schema: schema) if user || schema
  end
end

#resume_replicationObject Also known as: start_replication

Starts replication, or restarts replication after a pause



64
65
66
67
68
69
70
# File 'lib/jetpants/db/replication.rb', line 64

def resume_replication
  raise "This DB object has no master" unless master
  repl_binlog_coordinates(true)
  output "Resuming replication from #{@master}."
  output mysql_root_cmd "START SLAVE"
  @repl_paused = false
end

#revoke_all_access!Object

Disables access to a DB by the application user, and sets the DB to read-only. Useful when decommissioning instances from a shard that’s been split, or a former slave that’s been permanently removed from the pool



88
89
90
91
92
# File 'lib/jetpants/db/privileges.rb', line 88

def revoke_all_access!
  user_name = app_credentials[:user]
  enable_read_only!
  drop_user(user_name, true) # drop the user without replicating the drop statement to slaves
end

#revoke_privileges(username = false, database = false, *privileges) ⇒ Object

Revokes privileges from the given username for the specified database. Pass in privileges as additional params, each as strings. You may omit parameters to use the defaults in the Jetpants config file.



59
60
61
# File 'lib/jetpants/db/privileges.rb', line 59

def revoke_privileges(username=false, database=false, *privileges)
  grant_or_revoke_privileges('REVOKE', username, database, privileges)
end

#roleObject

Determines the DB’s role in its pool. Returns either :master, :active_slave, :standby_slave, or :backup_slave.

Note that we consider a node with no master and no slaves to be a :master, since we can’t determine if it had slaves but they’re just offline/dead, vs it being an orphaned machine.

In hierarchical replication scenarios (such as the child shard masters in the middle of a shard split), we return :master if Jetpants.topology considers the node to be the master for a pool.



264
265
266
267
268
269
270
271
272
273
274
# File 'lib/jetpants/db/state.rb', line 264

def role
  p = pool
  case
  when !@master then :master                                # nodes that aren't slaves (including orphans) 
  when p.master == self then :master                        # nodes that the topology thinks are masters
  when for_backups? then :backup_slave
  when p && p.active_slave_weights[self] then :active_slave # if pool in topology, determine based on expected/ideal state
  when !p && !is_standby? then :active_slave                # if pool missing from topology, determine based on actual state
  else :standby_slave
  end
end

#row_counts(tables, min_id, max_id) ⇒ Object

Counts rows falling between min_id and max_id for the supplied tables. Returns a hash mapping table names to counts. Note: runs 10 concurrent queries to perform the count quickly. This is MUCH faster than doing a single count, but far more I/O intensive, so don’t use this on a master or active slave.



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/jetpants/db/import_export.rb', line 194

def row_counts(tables, min_id, max_id)
  tables = [tables] unless tables.is_a? Array
  lock = Mutex.new
  row_count = {}
  tables.each do |t|
    row_count[t.name] = 0
    if min_id && max_id && t.chunks > 1
      (min_id..max_id).in_chunks(t.chunks, Jetpants.max_concurrency) do |min, max|
        result = query_return_first_value(t.sql_count_rows(min, max))
        lock.synchronize {row_count[t.name] += result}
      end
    else
      row_count[t.name] = query_return_first_value(t.sql_count_rows(false, false))
    end
    output "#{row_count[t.name]} rows counted", t
  end
  row_count
end

#running?Boolean

Returns true if MySQL is running for this instance, false otherwise. Note that if the host isn’t available/online/reachable, we consider MySQL to not be running.

Returns:

  • (Boolean)


36
37
38
39
# File 'lib/jetpants/db/state.rb', line 36

def running?
  probe if @running.nil?
  @running
end

#same_host_as?(db) ⇒ Boolean

Returns true if the supplied Jetpants::DB is on the same Jetpants::Host as self.

Returns:

  • (Boolean)


94
95
96
# File 'lib/jetpants/db.rb', line 94

def same_host_as?(db)
  @ip == db.ip
end

#seconds_behind_masterObject

Returns the number of seconds beind the master the replication execution is, as reported by SHOW SLAVE STATUS.



199
200
201
202
203
# File 'lib/jetpants/db/replication.rb', line 199

def seconds_behind_master
  raise "This instance is not a slave" unless master
  lag = slave_status[:seconds_behind_master]
  lag == 'NULL' ? nil : lag.to_i
end

#slave_statusObject

Returns a hash containing the information from SHOW SLAVE STATUS



243
244
245
246
247
248
249
250
251
252
# File 'lib/jetpants/db/replication.rb', line 243

def slave_status
  hash = mysql_root_cmd('SHOW SLAVE STATUS', :parse=>true)
  hash = {} if hash[:master_user] == 'test'
  if @master && hash.count < 1
    message = "should be a slave of #{@master}, but SHOW SLAVE STATUS indicates otherwise"
    raise "#{self}: #{message}" if Jetpants.verify_replication
    output message
  end
  hash
end

#slavesObject

Returns an Array of Jetpants::DB instances that are slaving from this instance, or nil if we can’t tell because this instance isn’t running.



18
19
20
21
22
# File 'lib/jetpants/db/state.rb', line 18

def slaves
  return nil unless running? || @slaves
  probe if @slaves.nil?
  @slaves
end

#start_mysql(*options) ⇒ Object

Starts MySQL, and confirms that something is now listening on the port. Raises an exception if MySQL is already running or if something else is already running on its port. Options should be supplied as positional method args, for example:

start_mysql '--skip-networking', '--skip-grant-tables'


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/jetpants/db/server.rb', line 25

def start_mysql(*options)
  if @master
    @repl_paused = options.include?('--skip-slave-start')
  end
  running = ssh_cmd "netstat -ln | grep ':#{@port}' | wc -l"
  raise "[#{@ip}] Failed to start MySQL: Something is already listening on port #{@port}" unless running.chomp == '0'
  if options.size == 0
    output "Attempting to start MySQL, no option overrides supplied"
  else
    output "Attempting to start MySQL with options #{options.join(' ')}"
  end
  output service(:start, 'mysql', options.join(' '))
  @options = options
  confirm_listening
  @running = true
  if role == :master && ! @options.include?('--skip-networking')
    disable_read_only!
  end
end

#start_query_killerObject

Has no built-in effect. Plugins can override it, and/or implement before_start_query_killer and after_start_query_killer callbacks.



82
83
# File 'lib/jetpants/db/server.rb', line 82

def start_query_killer
end

#stop_mysqlObject

Shuts down MySQL, and confirms that it is no longer listening. OK to use this if MySQL is already stopped; it’s a no-op then.



10
11
12
13
14
15
16
17
18
# File 'lib/jetpants/db/server.rb', line 10

def stop_mysql
  output "Attempting to shutdown MySQL"
  disconnect if @db
  output service(:stop, 'mysql')
  running = ssh_cmd "netstat -ln | grep ':#{@port}' | wc -l"
  raise "[#{@ip}] Failed to shut down MySQL: Something is still listening on port #{@port}" unless running.chomp == '0'
  @options = []
  @running = false
end

#stop_query_killerObject

Has no built-in effect. Plugins can override it, and/or implement before_stop_query_killer and after_stop_query_killer callbacks.



77
78
# File 'lib/jetpants/db/server.rb', line 77

def stop_query_killer
end

#tablesObject

List of tables (as defined by the pool)



36
37
38
# File 'lib/jetpants/db/schema.rb', line 36

def tables
  pool(true).tables
end

#taking_connections?(max = 4, interval = 2.0, threshold = 1) ⇒ Boolean

Confirms instance has no more than [max] connections currently (AS VISIBLE TO THE APP USER), and in [interval] seconds hasn’t received more than [threshold] additional connections. You may need to adjust max if running multiple query killers, monitoring agents, etc.

Returns:

  • (Boolean)


101
102
103
104
105
106
107
# File 'lib/jetpants/db/state.rb', line 101

def taking_connections?(max=4, interval=2.0, threshold=1)
  current_conns = query_return_array('show processlist').count
  return true if current_conns > max
  conn_counter = global_status[:Connections].to_i
  sleep(interval)
  global_status[:Connections].to_i - conn_counter > threshold
end

#taking_writes?(interval = 5.0) ⇒ Boolean

Confirms the binlog of this node has not moved during a duration of [interval] seconds.

Returns:

  • (Boolean)


133
134
135
136
137
# File 'lib/jetpants/db/state.rb', line 133

def taking_writes?(interval=5.0)
  coords = binlog_coordinates
  sleep(interval)
  coords != binlog_coordinates
end

#to_dbObject

Returns self, since self is already a Jetpants::DB.



124
125
126
# File 'lib/jetpants/db.rb', line 124

def to_db
  self
end

#to_hostObject

Returns the instance’s Jetpants::Host.



129
130
131
# File 'lib/jetpants/db.rb', line 129

def to_host
  @host
end

#to_sObject

Returns a string in the form “ip:port”



119
120
121
# File 'lib/jetpants/db.rb', line 119

def to_s
  "#{@ip}:#{@port}"
end

#version_cmp(db, precision = 2) ⇒ Object

Returns -1 if self is running a lower version than db; 1 if self is running a higher version; and 0 if running same version.



227
228
229
230
231
232
233
234
235
236
# File 'lib/jetpants/db/state.rb', line 227

def version_cmp(db, precision=2)
  raise "Invalid precision #{precision}" if precision < 1 || precision > 3
  my_tuple = version_tuple[0, precision]
  other_tuple = db.version_tuple[0, precision]
  my_tuple.each_with_index do |subver, i|
    return -1 if subver < other_tuple[i]
    return 1 if subver > other_tuple[i]
  end
  0
end

#version_tupleObject

Returns an array of integers representing the version of the MySQL server. For example, Percona Server 5.5.27-rel28.1-log would return [5, 5, 27]



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/jetpants/db/state.rb', line 200

def version_tuple
  result = nil
  if running?
    # If the server is running, we can just query it
    result = global_variables[:version].split('.', 3).map(&:to_i) rescue nil
  end
  if result.nil?
    # Otherwise we need to parse the output of mysqld --version
    output = ssh_cmd 'mysqld --version'
    matches = output.downcase.match('ver\s*(\d+)\.(\d+)\.(\d+)')
    raise "Unable to determine version for #{self}" unless matches
    result = matches[1, 3].map(&:to_i)
  end
  result
end