Class: Mysql::Protocol
- Inherits:
-
Object
- Object
- Mysql::Protocol
- Defined in:
- lib/mysql/protocol.rb
Overview
MySQL network protocol
Defined Under Namespace
Classes: AuthenticationPacket, AuthenticationResultPacket, ExecutePacket, FieldPacket, InitialPacket, PrepareResultPacket, ResultPacket, TlsAuthenticationPacket
Constant Summary collapse
- VERSION =
10
- MAX_PACKET_LENGTH =
2**24-1
- SSL_MODE_KEY =
{ SSL_MODE_DISABLED => 1, SSL_MODE_PREFERRED => 2, SSL_MODE_REQUIRED => 3, SSL_MODE_VERIFY_CA => 4, SSL_MODE_VERIFY_IDENTITY => 5, '1' => 1, '2' => 2, '3' => 3, '4' => 4, '5' => 5, 'disabled' => 1, 'preferred' => 2, 'required' => 3, 'verify_ca' => 4, 'verify_identity' => 5, :disabled => 1, :preferred => 2, :required => 3, :verify_ca => 4, :verify_identity => 5, }.freeze
Instance Attribute Summary collapse
-
#affected_rows ⇒ Object
readonly
Returns the value of attribute affected_rows.
-
#charset ⇒ Object
Returns the value of attribute charset.
-
#client_flags ⇒ Object
readonly
Returns the value of attribute client_flags.
-
#field_count ⇒ Object
readonly
Returns the value of attribute field_count.
-
#get_server_public_key ⇒ Object
readonly
Returns the value of attribute get_server_public_key.
-
#insert_id ⇒ Object
readonly
Returns the value of attribute insert_id.
-
#message ⇒ Object
readonly
Returns the value of attribute message.
-
#server_info ⇒ Object
readonly
Returns the value of attribute server_info.
-
#server_status ⇒ Object
readonly
Returns the value of attribute server_status.
-
#server_version ⇒ Object
readonly
Returns the value of attribute server_version.
-
#session_track ⇒ Object
readonly
Returns the value of attribute session_track.
-
#sqlstate ⇒ Object
readonly
Returns the value of attribute sqlstate.
-
#thread_id ⇒ Object
readonly
Returns the value of attribute thread_id.
-
#warning_count ⇒ Object
readonly
Returns the value of attribute warning_count.
Class Method Summary collapse
-
.net2value(pkt, type, unsigned) ⇒ Object
Convert netdata to Ruby value.
-
.value2net(v) ⇒ Integer, String
convert Ruby value to netdata.
Instance Method Summary collapse
-
#authenticate ⇒ Object
initial negotiate and authenticate.
- #check_state(st) ⇒ Object
- #close ⇒ Object
- #enable_ssl ⇒ Object
- #gc_stmt(stmt_id) ⇒ Object
-
#get_result ⇒ integer?
get result of query.
-
#initialize(opts) ⇒ Protocol
constructor
make socket connection to server.
-
#kill_command(pid) ⇒ Object
Kill command.
- #more_results? ⇒ Boolean
-
#ping_command ⇒ Object
Ping command.
-
#query_command(query) ⇒ Object
Query command.
-
#quit_command ⇒ Object
Quit command.
-
#read ⇒ Packet
Read one packet data.
-
#read_eof_packet ⇒ Object
Read EOF packet.
- #read_timeout(len, timeout) ⇒ Object
-
#refresh_command(op) ⇒ Object
Refresh command.
-
#reset ⇒ Object
Reset sequence number.
-
#retr_all_records(record_class) ⇒ Array<record_class>
Retrieve all records for simple query or prepared statement.
-
#retr_fields ⇒ Array<Mysql::Field>
Retrieve n fields.
-
#retr_record(record_class) ⇒ <record_class>?
Retrieve one record for simple query or prepared statement.
-
#send_local_file(filename) ⇒ Object
send local file to server.
-
#set_option_command(opt) ⇒ Object
Set option command.
- #set_state(st) ⇒ Object
-
#shutdown_command(level) ⇒ Object
Shutdown command.
-
#simple_command(packet) ⇒ String
Send simple command.
- #ssl_cipher ⇒ Object
-
#statistics_command ⇒ Object
Statistics command.
-
#stmt_close_command(stmt_id) ⇒ Object
Stmt close command.
-
#stmt_execute_command(stmt_id, values) ⇒ Integer
Stmt execute command.
-
#stmt_prepare_command(stmt) ⇒ Array<Integer, Integer, Array<Field>>
Stmt prepare command.
- #synchronize(before: nil, after: nil, error: nil) ⇒ Object
-
#write(data) ⇒ Object
Write one packet data.
- #write_timeout(data, timeout) ⇒ Object
Constructor Details
#initialize(opts) ⇒ Protocol
make socket connection to server.
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/mysql/protocol.rb', line 165 def initialize(opts) @mutex = Mutex.new @opts = opts @charset = Mysql::Charset.by_name("utf8mb4") @insert_id = 0 @warning_count = 0 @session_track = {} @gc_stmt_queue = [] # stmt id list which GC destroy. set_state :INIT @get_server_public_key = @opts[:get_server_public_key] begin if @opts[:io] @socket = @opts[:io] elsif @opts[:host].nil? or @opts[:host].empty? or @opts[:host] == "localhost" socket = @opts[:socket] || ENV["MYSQL_UNIX_PORT"] || MYSQL_UNIX_PORT @socket = Socket.unix(socket) else port = @opts[:port] || ENV["MYSQL_TCP_PORT"] || (Socket.getservbyname("mysql", "tcp") rescue MYSQL_TCP_PORT) @socket = Socket.tcp(@opts[:host], port, connect_timeout: @opts[:connect_timeout]) end rescue Errno::ETIMEDOUT raise ClientError, "connection timeout" end end |
Instance Attribute Details
#affected_rows ⇒ Object (readonly)
Returns the value of attribute affected_rows.
127 128 129 |
# File 'lib/mysql/protocol.rb', line 127 def affected_rows @affected_rows end |
#charset ⇒ Object
Returns the value of attribute charset.
135 136 137 |
# File 'lib/mysql/protocol.rb', line 135 def charset @charset end |
#client_flags ⇒ Object (readonly)
Returns the value of attribute client_flags.
125 126 127 |
# File 'lib/mysql/protocol.rb', line 125 def client_flags @client_flags end |
#field_count ⇒ Object (readonly)
Returns the value of attribute field_count.
134 135 136 |
# File 'lib/mysql/protocol.rb', line 134 def field_count @field_count end |
#get_server_public_key ⇒ Object (readonly)
Returns the value of attribute get_server_public_key.
133 134 135 |
# File 'lib/mysql/protocol.rb', line 133 def get_server_public_key @get_server_public_key end |
#insert_id ⇒ Object (readonly)
Returns the value of attribute insert_id.
128 129 130 |
# File 'lib/mysql/protocol.rb', line 128 def insert_id @insert_id end |
#message ⇒ Object (readonly)
Returns the value of attribute message.
131 132 133 |
# File 'lib/mysql/protocol.rb', line 131 def @message end |
#server_info ⇒ Object (readonly)
Returns the value of attribute server_info.
122 123 124 |
# File 'lib/mysql/protocol.rb', line 122 def server_info @server_info end |
#server_status ⇒ Object (readonly)
Returns the value of attribute server_status.
129 130 131 |
# File 'lib/mysql/protocol.rb', line 129 def server_status @server_status end |
#server_version ⇒ Object (readonly)
Returns the value of attribute server_version.
123 124 125 |
# File 'lib/mysql/protocol.rb', line 123 def server_version @server_version end |
#session_track ⇒ Object (readonly)
Returns the value of attribute session_track.
132 133 134 |
# File 'lib/mysql/protocol.rb', line 132 def session_track @session_track end |
#sqlstate ⇒ Object (readonly)
Returns the value of attribute sqlstate.
126 127 128 |
# File 'lib/mysql/protocol.rb', line 126 def sqlstate @sqlstate end |
#thread_id ⇒ Object (readonly)
Returns the value of attribute thread_id.
124 125 126 |
# File 'lib/mysql/protocol.rb', line 124 def thread_id @thread_id end |
#warning_count ⇒ Object (readonly)
Returns the value of attribute warning_count.
130 131 132 |
# File 'lib/mysql/protocol.rb', line 130 def warning_count @warning_count end |
Class Method Details
.net2value(pkt, type, unsigned) ⇒ Object
Convert netdata to Ruby value
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/mysql/protocol.rb', line 25 def self.net2value(pkt, type, unsigned) case type when Field::TYPE_STRING, Field::TYPE_VAR_STRING, Field::TYPE_BLOB, Field::TYPE_JSON, Field::TYPE_GEOMETRY return pkt.lcs when Field::TYPE_NEWDECIMAL s = pkt.lcs return s =~ /\./ && s !~ /\.0*\z/ ? BigDecimal(s) : s.to_i when Field::TYPE_TINY v = pkt.utiny return unsigned ? v : v < 128 ? v : v-256 when Field::TYPE_SHORT v = pkt.ushort return unsigned ? v : v < 32768 ? v : v-65536 when Field::TYPE_INT24, Field::TYPE_LONG v = pkt.ulong return unsigned ? v : v < 0x8000_0000 ? v : v-0x10000_0000 when Field::TYPE_LONGLONG n1, n2 = pkt.ulong, pkt.ulong v = (n2 << 32) | n1 return unsigned ? v : v < 0x8000_0000_0000_0000 ? v : v-0x10000_0000_0000_0000 when Field::TYPE_FLOAT return pkt.read(4).unpack1('e') when Field::TYPE_DOUBLE return pkt.read(8).unpack1('E') when Field::TYPE_DATE len = pkt.utiny y, m, d = pkt.read(len).unpack("vCC") t = Date.new(y, m, d) rescue nil return t when Field::TYPE_DATETIME, Field::TYPE_TIMESTAMP len = pkt.utiny y, m, d, h, mi, s, sp = pkt.read(len).unpack("vCCCCCV") return Time.new(y, m, d, h, mi, Rational(s.to_i*1000000+sp.to_i, 1000000)) rescue nil when Field::TYPE_TIME len = pkt.utiny sign, d, h, mi, s, sp = pkt.read(len).unpack("CVCCCV") r = d.to_i*86400 + h.to_i*3600 + mi.to_i*60 + s.to_i + sp.to_f/1000000 r *= -1 if sign != 0 return r when Field::TYPE_YEAR return pkt.ushort when Field::TYPE_BIT return pkt.lcs else raise "not implemented: type=#{type}" end end |
.value2net(v) ⇒ Integer, String
convert Ruby value to netdata
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/mysql/protocol.rb', line 78 def self.value2net(v) v = v == true ? 1 : v == false ? 0 : v case v when nil type = Field::TYPE_NULL val = "" when Integer if -0x8000_0000 <= v && v < 0x8000_0000 type = Field::TYPE_LONG val = [v].pack('V') elsif -0x8000_0000_0000_0000 <= v && v < 0x8000_0000_0000_0000 type = Field::TYPE_LONGLONG val = [v&0xffffffff, v>>32].pack("VV") elsif 0x8000_0000_0000_0000 <= v && v <= 0xffff_ffff_ffff_ffff type = Field::TYPE_LONGLONG | 0x8000 val = [v&0xffffffff, v>>32].pack("VV") else type =Field::TYPE_NEWDECIMAL val = Packet.lcs(v.to_s) end when BigDecimal type = Field::TYPE_NEWDECIMAL val = Packet.lcs(v.to_s) when Float type = Field::TYPE_DOUBLE val = [v].pack("E") when String type = Field::TYPE_STRING val = Packet.lcs(v) when Time type = Field::TYPE_DATETIME val = [11, v.year, v.month, v.day, v.hour, v.min, v.sec, v.usec].pack("CvCCCCCV") when DateTime type = Field::TYPE_DATETIME val = [11, v.year, v.month, v.day, v.hour, v.min, v.sec, (v.sec_fraction*1000000).to_i].pack("CvCCCCCV") when Date type = Field::TYPE_DATE val = [11, v.year, v.month, v.day, 0, 0, 0, 0].pack("CvCCCCCV") else raise ProtocolError, "class #{v.class} is not supported" end return type, val end |
Instance Method Details
#authenticate ⇒ Object
initial negotiate and authenticate.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/mysql/protocol.rb', line 197 def authenticate synchronize(before: :INIT, after: :READY) do reset init_packet = InitialPacket.parse read @server_info = init_packet.server_version @server_version = init_packet.server_version.split(/\D/)[0, 3].inject{|a, b| a.to_i*100+b.to_i} @server_capabilities = init_packet.server_capabilities @thread_id = init_packet.thread_id @client_flags = CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_MULTI_RESULTS | CLIENT_PS_MULTI_RESULTS | CLIENT_PLUGIN_AUTH | CLIENT_CONNECT_ATTRS | CLIENT_SESSION_TRACK | CLIENT_LOCAL_FILES @client_flags |= CLIENT_CONNECT_WITH_DB if @opts[:database] @client_flags |= @opts[:flags] if @opts[:charset] @charset = @opts[:charset].is_a?(Charset) ? @opts[:charset] : Charset.by_name(@opts[:charset]) else @charset = Charset.by_number(init_packet.server_charset) @charset.encoding # raise error if unsupported charset end enable_ssl Authenticator.new(self).authenticate(@opts[:username], @opts[:password].to_s, @opts[:database], init_packet.scramble_buff, init_packet.auth_plugin, @opts[:connect_attrs]) end end |
#check_state(st) ⇒ Object
474 475 476 |
# File 'lib/mysql/protocol.rb', line 474 def check_state(st) raise Mysql::ClientError::CommandsOutOfSync, 'command out of sync' unless @state == st end |
#close ⇒ Object
190 191 192 |
# File 'lib/mysql/protocol.rb', line 190 def close @socket.close rescue nil end |
#enable_ssl ⇒ Object
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/mysql/protocol.rb', line 242 def enable_ssl ssl_mode = SSL_MODE_KEY[@opts[:ssl_mode]] raise ClientError, "ssl_mode #{@opts[:ssl_mode]} is not supported" unless ssl_mode return if ssl_mode == SSL_MODE_DISABLED if ssl_mode == SSL_MODE_PREFERRED return if @socket.local_address.unix? return if @server_capabilities & CLIENT_SSL == 0 end if ssl_mode >= SSL_MODE_REQUIRED && @server_capabilities & CLIENT_SSL == 0 raise ClientError::SslConnectionError, "SSL is required but the server doesn't support it" end context = OpenSSL::SSL::SSLContext.new context.set_params(@opts[:ssl_context_params]) context.verify_mode = OpenSSL::SSL::VERIFY_NONE if ssl_mode < SSL_MODE_VERIFY_CA context.verify_hostname = false if ssl_mode < SSL_MODE_VERIFY_IDENTITY ssl_socket = OpenSSL::SSL::SSLSocket.new(@socket, context) ssl_socket.sync_close = true ssl_socket.hostname = @opts[:host] if ssl_mode >= SSL_MODE_VERIFY_IDENTITY @client_flags |= CLIENT_SSL write Protocol::TlsAuthenticationPacket.serialize(@client_flags, 1024**3, @charset.number) ssl_socket.connect @socket = ssl_socket rescue OpenSSL::SSL::SSLError => e @client_flags &= ~CLIENT_SSL return if @opts[:ssl_mode] < SSL_MODE_REQUIRED raise e end |
#gc_stmt(stmt_id) ⇒ Object
470 471 472 |
# File 'lib/mysql/protocol.rb', line 470 def gc_stmt(stmt_id) @gc_stmt_queue.push stmt_id end |
#get_result ⇒ integer?
get result of query.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/mysql/protocol.rb', line 303 def get_result synchronize(before: :WAIT_RESULT, error: :READY) do res_packet = ResultPacket.parse read @field_count = res_packet.field_count if @field_count.to_i > 0 # result data exists set_state :FIELD return @field_count end if @field_count.nil? # LOAD DATA LOCAL INFILE send_local_file(res_packet.) res_packet = ResultPacket.parse read end @affected_rows, @insert_id, @server_status, @warning_count, @message, @session_track = res_packet.affected_rows, res_packet.insert_id, res_packet.server_status, res_packet.warning_count, res_packet., res_packet.session_track set_state :READY unless more_results? return nil end end |
#kill_command(pid) ⇒ Object
Kill command
400 401 402 |
# File 'lib/mysql/protocol.rb', line 400 def kill_command(pid) simple_command [COM_PROCESS_KILL, pid].pack("CV") end |
#more_results? ⇒ Boolean
322 323 324 |
# File 'lib/mysql/protocol.rb', line 322 def more_results? @server_status & SERVER_MORE_RESULTS_EXISTS != 0 end |
#ping_command ⇒ Object
Ping command
395 396 397 |
# File 'lib/mysql/protocol.rb', line 395 def ping_command simple_command [COM_PING].pack("C") end |
#query_command(query) ⇒ Object
Query command
294 295 296 297 298 299 |
# File 'lib/mysql/protocol.rb', line 294 def query_command(query) synchronize(before: :READY, after: :WAIT_RESULT, error: :READY) do reset write [COM_QUERY, @charset.convert(query)].pack("Ca*") end end |
#quit_command ⇒ Object
Quit command
280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/mysql/protocol.rb', line 280 def quit_command get_result if @state == :WAIT_RESULT retr_fields if @state == :FIELD retr_all_records(RawRecord) if @state == :RESULT synchronize(before: :READY, after: :CLOSED) do reset write [COM_QUIT].pack("C") close @gc_stmt_queue.clear end end |
#read ⇒ Packet
Read one packet data
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 |
# File 'lib/mysql/protocol.rb', line 515 def read data = '' len = nil begin timeout = @state == :INIT ? @opts[:connect_timeout] : @opts[:read_timeout] header = read_timeout(4, timeout) raise EOFError unless header && header.length == 4 len1, len2, seq = header.unpack("CvC") len = (len2 << 8) + len1 raise ProtocolError, "invalid packet: sequence number mismatch(#{seq} != #{@seq}(expected))" if @seq != seq @seq = (@seq + 1) % 256 ret = read_timeout(len, timeout) raise EOFError unless ret && ret.length == len data.concat ret rescue EOFError, OpenSSL::SSL::SSLError close raise ClientError::ServerLost, 'Lost connection to server during query' rescue Errno::ETIMEDOUT raise ClientError, "read timeout" end while len == MAX_PACKET_LENGTH @sqlstate = "00000" # Error packet if data[0] == ?\xff _, errno, marker, @sqlstate, = data.unpack("Cvaa5a*") unless marker == "#" _, errno, = data.unpack("Cva*") # Version 4.0 Error @sqlstate = "" end @server_status &= ~SERVER_MORE_RESULTS_EXISTS .force_encoding(@charset.encoding) if Mysql::ServerError::ERROR_MAP.key? errno raise Mysql::ServerError::ERROR_MAP[errno].new(, @sqlstate) end raise Mysql::ServerError.new(, @sqlstate, errno) end Packet.new(data) end |
#read_eof_packet ⇒ Object
Read EOF packet
623 624 625 626 627 628 629 |
# File 'lib/mysql/protocol.rb', line 623 def read_eof_packet pkt = read raise ProtocolError, "packet is not EOF" unless pkt.eof? pkt.utiny # 0xFE _warnings = pkt.ushort @server_status = pkt.ushort end |
#read_timeout(len, timeout) ⇒ Object
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'lib/mysql/protocol.rb', line 555 def read_timeout(len, timeout) return @socket.read(len) if timeout.nil? || timeout == 0 result = '' e = Time.now + timeout while result.size < len now = Time.now raise Errno::ETIMEDOUT if now > e r = @socket.read_nonblock(len - result.size, exception: false) case r when :wait_readable IO.select([@socket], nil, nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler next when :wait_writable IO.select(nil, [@socket], nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler next else result << r end end return result end |
#refresh_command(op) ⇒ Object
Refresh command
405 406 407 |
# File 'lib/mysql/protocol.rb', line 405 def refresh_command(op) simple_command [COM_REFRESH, op].pack("CC") end |
#reset ⇒ Object
Reset sequence number
508 509 510 |
# File 'lib/mysql/protocol.rb', line 508 def reset @seq = 0 # packet counter. reset by each command end |
#retr_all_records(record_class) ⇒ Array<record_class>
Retrieve all records for simple query or prepared statement
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
# File 'lib/mysql/protocol.rb', line 375 def retr_all_records(record_class) synchronize(before: :RESULT) do enc = charset.encoding begin all_recs = [] until (pkt = read).eof? all_recs.push record_class.new(pkt, @fields, enc) end pkt.utiny # 0xFE _warnings = pkt.ushort @server_status = pkt.ushort @no_more_records = true all_recs ensure set_state(more_results? ? :WAIT_RESULT : :READY) end end end |
#retr_fields ⇒ Array<Mysql::Field>
Retrieve n fields
341 342 343 344 345 346 347 348 |
# File 'lib/mysql/protocol.rb', line 341 def retr_fields synchronize(before: :FIELD, after: :RESULT, error: :READY) do @fields = @field_count.times.map{Field.new FieldPacket.parse(read)} read_eof_packet @no_more_records = false @fields end end |
#retr_record(record_class) ⇒ <record_class>?
Retrieve one record for simple query or prepared statement
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/mysql/protocol.rb', line 354 def retr_record(record_class) return nil if @no_more_records synchronize(before: :RESULT) do enc = charset.encoding begin unless (pkt = read).eof? return record_class.new(pkt, @fields, enc) end pkt.utiny pkt.ushort @server_status = pkt.ushort set_state(more_results? ? :WAIT_RESULT : :READY) @no_more_records = true return nil end end end |
#send_local_file(filename) ⇒ Object
send local file to server
327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/mysql/protocol.rb', line 327 def send_local_file(filename) filename = File.absolute_path(filename) if @opts[:local_infile] || @opts[:load_data_local_dir] && filename.start_with?(@opts[:load_data_local_dir]) File.open(filename){|f| write f} write nil # EOF else write nil # send empty data instead of file contents read # result packet raise ClientError::LoadDataLocalInfileRejected, 'LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.' end end |
#set_option_command(opt) ⇒ Object
Set option command
410 411 412 |
# File 'lib/mysql/protocol.rb', line 410 def set_option_command(opt) simple_command [COM_SET_OPTION, opt].pack("Cv") end |
#set_state(st) ⇒ Object
478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/mysql/protocol.rb', line 478 def set_state(st) @state = st return if st != :READY || @gc_stmt_queue.empty? || @socket&.closed? gc_disabled = GC.disable begin while (st = @gc_stmt_queue.shift) reset write [COM_STMT_CLOSE, st].pack("CV") end ensure GC.enable unless gc_disabled end end |
#shutdown_command(level) ⇒ Object
Shutdown command
415 416 417 |
# File 'lib/mysql/protocol.rb', line 415 def shutdown_command(level) simple_command [COM_SHUTDOWN, level].pack("CC") end |
#simple_command(packet) ⇒ String
Send simple command
634 635 636 637 638 639 640 |
# File 'lib/mysql/protocol.rb', line 634 def simple_command(packet) synchronize(before: :READY, after: :READY) do reset write packet read.to_s end end |
#ssl_cipher ⇒ Object
275 276 277 |
# File 'lib/mysql/protocol.rb', line 275 def ssl_cipher @client_flags.allbits?(CLIENT_SSL) ? @socket.cipher : nil end |
#statistics_command ⇒ Object
Statistics command
420 421 422 |
# File 'lib/mysql/protocol.rb', line 420 def statistics_command simple_command [COM_STATISTICS].pack("C") end |
#stmt_close_command(stmt_id) ⇒ Object
Stmt close command
459 460 461 462 463 464 465 466 467 468 |
# File 'lib/mysql/protocol.rb', line 459 def stmt_close_command(stmt_id) get_result if @state == :WAIT_RESULT retr_fields if @state == :FIELD retr_all_records(StmtRawRecord) if @state == :RESULT synchronize(before: :READY, after: :READY) do reset write [COM_STMT_CLOSE, stmt_id].pack("CV") @gc_stmt_queue.delete stmt_id end end |
#stmt_execute_command(stmt_id, values) ⇒ Integer
Stmt execute command
450 451 452 453 454 455 |
# File 'lib/mysql/protocol.rb', line 450 def stmt_execute_command(stmt_id, values) synchronize(before: :READY, after: :WAIT_RESULT, error: :READY) do reset write ExecutePacket.serialize(stmt_id, Mysql::Stmt::CURSOR_TYPE_NO_CURSOR, values) end end |
#stmt_prepare_command(stmt) ⇒ Array<Integer, Integer, Array<Field>>
Stmt prepare command
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/mysql/protocol.rb', line 427 def stmt_prepare_command(stmt) synchronize(before: :READY, after: :READY) do reset write [COM_STMT_PREPARE, charset.convert(stmt)].pack("Ca*") res_packet = PrepareResultPacket.parse read if res_packet.param_count > 0 res_packet.param_count.times{read} # skip parameter packet read_eof_packet end if res_packet.field_count > 0 fields = res_packet.field_count.times.map{Field.new FieldPacket.parse(read)} read_eof_packet else fields = [] end return res_packet.statement_id, res_packet.param_count, fields end end |
#synchronize(before: nil, after: nil, error: nil) ⇒ Object
492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/mysql/protocol.rb', line 492 def synchronize(before: nil, after: nil, error: nil) @mutex.synchronize do check_state before if before begin return yield rescue set_state error if error raised = true raise ensure set_state after if after && !raised end end end |
#write(data) ⇒ Object
Write one packet data
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 |
# File 'lib/mysql/protocol.rb', line 579 def write(data) timeout = @state == :INIT ? @opts[:connect_timeout] : @opts[:write_timeout] @socket.sync = false if data.nil? write_timeout([0, 0, @seq].pack("CvC"), timeout) @seq = (@seq + 1) % 256 else data = StringIO.new data if data.is_a? String while (d = data.read(MAX_PACKET_LENGTH)) write_timeout([d.length%256, d.length/256, @seq].pack("CvC")+d, timeout) @seq = (@seq + 1) % 256 end end @socket.sync = true @socket.flush rescue Errno::EPIPE, OpenSSL::SSL::SSLError close raise ClientError::ServerGoneError, 'MySQL server has gone away' rescue Errno::ETIMEDOUT raise ClientError, "write timeout" end |
#write_timeout(data, timeout) ⇒ Object
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 |
# File 'lib/mysql/protocol.rb', line 601 def write_timeout(data, timeout) return @socket.write(data) if timeout.nil? || timeout == 0 len = 0 e = Time.now + timeout while len < data.size now = Time.now raise Errno::ETIMEDOUT if now > e l = @socket.write_nonblock(data[len..], exception: false) case l when :wait_readable IO.select([@socket], nil, nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler when :wait_writable IO.select(nil, [@socket], nil, e - now) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler else len += l end end return len end |