Class: Neo4jBolt::BoltSocket
- Inherits:
-
Object
- Object
- Neo4jBolt::BoltSocket
- Defined in:
- lib/neo4j_bolt.rb
Instance Method Summary collapse
- #_append(s) ⇒ Object
- #append(v) ⇒ Object
- #append_array(a) ⇒ Object
- #append_dict(d) ⇒ Object
- #append_int16(i) ⇒ Object
- #append_int32(i) ⇒ Object
- #append_int64(i) ⇒ Object
- #append_int8(i) ⇒ Object
- #append_s(s) ⇒ Object
- #append_token(i) ⇒ Object
- #append_uint16(i) ⇒ Object
- #append_uint32(i) ⇒ Object
- #append_uint64(i) ⇒ Object
- #append_uint8(i) ⇒ Object
- #assert(condition) ⇒ Object
- #bolt_error(code, message) ⇒ Object
- #connect ⇒ Object
- #disconnect ⇒ Object
- #fix_value(value) ⇒ Object
- #flush ⇒ Object
-
#initialize ⇒ BoltSocket
constructor
A new instance of BoltSocket.
- #neo4j_query(query, data = {}, &block) ⇒ Object
- #neo4j_query_expect_one(query, data = {}) ⇒ Object
- #parse(buf) ⇒ Object
- #parse_dict(buf) ⇒ Object
- #parse_list(buf) ⇒ Object
- #parse_s(buf) ⇒ Object
- #read_response(&block) ⇒ Object
- #reset ⇒ Object
- #run_query(query, data = {}, &block) ⇒ Object
- #setup_constraints_and_indexes(constraints, indexes) ⇒ Object
- #transaction(&block) ⇒ Object
Constructor Details
#initialize ⇒ BoltSocket
Returns a new instance of BoltSocket.
280 281 282 |
# File 'lib/neo4j_bolt.rb', line 280 def initialize() reset() end |
Instance Method Details
#_append(s) ⇒ Object
297 298 299 |
# File 'lib/neo4j_bolt.rb', line 297 def _append(s) @buffer += (s.is_a? String) ? s.unpack('C*') : [s] end |
#append(v) ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/neo4j_bolt.rb', line 358 def append(v) if v.is_a? Array append_array(v) elsif v.is_a? Set append_array(v.to_a) elsif v.is_a? Hash append_dict(v) elsif v.is_a? String append_s(v) elsif v.is_a? Symbol append_s(v.to_s) elsif v.is_a? NilClass append_uint8(0xC0) elsif v.is_a? TrueClass append_uint8(0xC3) elsif v.is_a? FalseClass append_uint8(0xC2) elsif v.is_a? Integer if v >= -16 && v <= -1 append_uint8(0x100 + v) elsif v >= 0 && v < 0x80 append_uint8(v) elsif v >= -0x80 && v < 0x80 append_uint8(0xC8) append_int8(v) elsif v >= -0x8000 && v < 0x8000 append_uint8(0xC9) append_int16(v) elsif v >= -0x80000000 && v < 0x80000000 append_uint8(0xCA) append_int32(v) elsif v >= -0x8000000000000000 && v < 0x8000000000000000 append_uint8(0xCB) append_int64(v) else raise Neo4jBolt::IntegerOutOfRangeError.new() end elsif v.is_a? Float append_uint8(0xC1) _append([v].pack('G')) else raise "Type not supported: #{v.class}" end end |
#append_array(a) ⇒ Object
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/neo4j_bolt.rb', line 425 def append_array(a) if a.size < 16 append_uint8(0x90 + a.size) elsif a.size < 0x100 append_uint8(0xD4) append_uint8(a.size) elsif a.size < 0x10000 append_uint8(0xD5) append_uint16(a.size) elsif a.size < 0x100000000 append_uint8(0xD6) append_uint32(a.size) else raise "list cannot exceed 4G entries!" end a.each do |v| append(v) end end |
#append_dict(d) ⇒ Object
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/neo4j_bolt.rb', line 403 def append_dict(d) # STDERR.puts "Appending dict: [#{d.to_json}]" if d.size < 16 append_uint8(0xA0 + d.size) elsif d.size < 0x100 append_uint8(0xD8) append_uint8(d.size) elsif d.size < 0x10000 append_uint8(0xD9) append_uint16(d.size) elsif d.size < 0x100000000 append_uint8(0xDA) append_uint32(d.size) else raise "dict cannot exceed 4G entries!" end d.each_pair do |k, v| append_s(k) append(v) end end |
#append_int16(i) ⇒ Object
326 327 328 |
# File 'lib/neo4j_bolt.rb', line 326 def append_int16(i) _append([i].pack('s>')) end |
#append_int32(i) ⇒ Object
330 331 332 |
# File 'lib/neo4j_bolt.rb', line 330 def append_int32(i) _append([i].pack('l>')) end |
#append_int64(i) ⇒ Object
334 335 336 |
# File 'lib/neo4j_bolt.rb', line 334 def append_int64(i) _append([i].pack('q>')) end |
#append_int8(i) ⇒ Object
322 323 324 |
# File 'lib/neo4j_bolt.rb', line 322 def append_int8(i) _append([i].pack('c')) end |
#append_s(s) ⇒ Object
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/neo4j_bolt.rb', line 338 def append_s(s) s = s.to_s # STDERR.puts "Appending string: [#{s}]" if s.bytesize < 16 append_uint8(0x80 + s.bytesize) elsif s.bytesize < 0x100 append_uint8(0xD0) append_uint8(s.bytesize) elsif s.bytesize < 0x10000 append_uint8(0xD1) append_uint16(s.bytesize) elsif s.bytesize < 0x100000000 append_uint8(0xD2) append_uint32(s.bytesize) else raise "string cannot exceed 4GB!" end _append(s) end |
#append_token(i) ⇒ Object
305 306 307 308 |
# File 'lib/neo4j_bolt.rb', line 305 def append_token(i) # STDERR.puts "Appending token: [#{BOLT_MARKER_LABELS[i]}]" append_uint8(i) end |
#append_uint16(i) ⇒ Object
310 311 312 |
# File 'lib/neo4j_bolt.rb', line 310 def append_uint16(i) _append([i].pack('S>')) end |
#append_uint32(i) ⇒ Object
314 315 316 |
# File 'lib/neo4j_bolt.rb', line 314 def append_uint32(i) _append([i].pack('L>')) end |
#append_uint64(i) ⇒ Object
318 319 320 |
# File 'lib/neo4j_bolt.rb', line 318 def append_uint64(i) _append([i].pack('Q>')) end |
#append_uint8(i) ⇒ Object
301 302 303 |
# File 'lib/neo4j_bolt.rb', line 301 def append_uint8(i) _append([i].pack('C')) end |
#assert(condition) ⇒ Object
293 294 295 |
# File 'lib/neo4j_bolt.rb', line 293 def assert(condition) raise "Assertion failed" unless condition end |
#bolt_error(code, message) ⇒ Object
626 627 628 629 630 631 632 633 634 |
# File 'lib/neo4j_bolt.rb', line 626 def bolt_error(code, ) if code == 'Neo.ClientError.Statement.SyntaxError' SyntaxError.new() elsif code == 'Neo.ClientError.Schema.ConstraintValidationFailed' ConstraintValidationFailedError.new() else Error.new("#{code}\n#{}") end end |
#connect ⇒ Object
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 |
# File 'lib/neo4j_bolt.rb', line 666 def connect() @socket = TCPSocket.new(Neo4jBolt.bolt_host, Neo4jBolt.bolt_port) # The line below is important, otherwise we'll have to wait 40ms before every read @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) @buffer = [] @socket.write("\x60\x60\xB0\x17") @socket.write("\x00\x00\x04\x05") @socket.write("\x00\x00\x04\x04") @socket.write("\x00\x00\x00\x00") @socket.write("\x00\x00\x00\x00") @bolt_version = @socket.read(4).unpack('V').first >> 16 # STDERR.puts "Handshake Bolt version: 0x%08x" % @bolt_version unless [0x0504, 0x0404].include?(@bolt_version) raise "Unable to establish connection to Neo4j using Bolt!" end @state.set(ServerState::CONNECTED) append_uint8(0xb1) append_token(BoltMarker::BOLT_HELLO) append_dict({ :routing => nil, :scheme => 'none', :user_agent => "neo4j_bolt/#{Neo4jBolt::VERSION}", :bolt_agent => { :product => "neo4j_bolt/#{Neo4jBolt::VERSION}", }, }) flush() read_response() do |data| if data[:marker] == BoltMarker::BOLT_SUCCESS parts = data[:data]['server'].split('/')[1].split('.').map { |x| x.to_i} if @bolt_version >= 0x0501 @state.set(ServerState::AUTHENTICATION) append_uint8(0xb1) append_token(BoltMarker::BOLT_LOGON) append_dict({:scheme => 'none'}) flush() read_response() do |data2| @state.set(ServerState::READY) if data2[:marker] == BoltMarker::BOLT_SUCCESS @state.set(ServerState::READY) end end else @state.set(ServerState::READY) end elsif data[:marker] == BoltMarker::BOLT_FAILURE @state.set(ServerState::DEFUNCT) else raise UnexpectedServerResponse.new(data[:marker]) end end @transaction = 0 @transaction_failed = false end |
#disconnect ⇒ Object
722 723 724 725 726 727 |
# File 'lib/neo4j_bolt.rb', line 722 def disconnect() append_uint8(0xb0) append_token(BoltMarker::BOLT_GOODBYE) flush() @state.set(ServerState::DEFUNCT) end |
#fix_value(value) ⇒ Object
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 |
# File 'lib/neo4j_bolt.rb', line 797 def fix_value(value) if value.is_a? Hash if value[:marker] == BoltMarker::BOLT_NODE Node.new(value[:id], value[:labels], fix_value(value[:properties])) elsif value[:marker] == BoltMarker::BOLT_RELATIONSHIP Relationship.new(value[:id], value[:start_node_id], value[:end_node_id], value[:type], fix_value(value[:properties])) else Hash[value.map { |k, v| [k.to_sym, fix_value(v)] }] end elsif value.is_a? Array value.map { |v| fix_value(v) } else value end end |
#flush ⇒ Object
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'lib/neo4j_bolt.rb', line 445 def flush() # STDERR.puts "Flushing buffer with #{@buffer.size} bytes..." # offset = 0 # last_offset = 0 # while offset < @buffer.size # if offset % 16 == 0 # STDERR.write sprintf('%04x | ', offset) # end # STDERR.write sprintf("%02x ", @buffer[offset]) # offset += 1 # if offset % 16 == 0 # STDERR.write ' ' * 4 # (last_offset...offset).each do |i| # b = @buffer[i] # STDERR.write (b >= 32 && b < 128) ? b.chr : '.' # end # STDERR.puts # last_offset = offset # end # end # (16 - offset + last_offset).times { STDERR.write ' ' } # STDERR.write ' ' * 4 # (last_offset...offset).each do |i| # b = @buffer[i] # STDERR.write (b >= 32 && b < 128) ? b.chr : '.' # end # STDERR.puts size = @buffer.size offset = 0 while size > 0 chunk_size = [size, 0xffff].min @socket.write([chunk_size].pack('n')) @socket.write(@buffer[offset, chunk_size].pack('C*')) offset += chunk_size size -= chunk_size end @socket.write([0].pack('n')) @buffer = [] end |
#neo4j_query(query, data = {}, &block) ⇒ Object
873 874 875 876 877 878 879 880 881 882 883 |
# File 'lib/neo4j_bolt.rb', line 873 def neo4j_query(query, data = {}, &block) rows = [] run_query(query, data) do |row| if block_given? yield row else rows << row end end return block_given? ? nil : rows end |
#neo4j_query_expect_one(query, data = {}) ⇒ Object
885 886 887 888 889 890 891 |
# File 'lib/neo4j_bolt.rb', line 885 def neo4j_query_expect_one(query, data = {}) rows = neo4j_query(query, data) if rows.size != 1 raise ExpectedOneResultError.new("Expected one result, but got #{rows.size}.") end rows.first end |
#parse(buf) ⇒ Object
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 |
# File 'lib/neo4j_bolt.rb', line 548 def parse(buf) f = buf.peek if f >= 0x80 && f <= 0x8F || f == 0xD0 || f == 0xD1 || f == 0xD2 parse_s(buf) elsif f >= 0x90 && f <= 0x9F || f == 0xD4 || f == 0xD5 || f == 0xD6 parse_list(buf) elsif f >= 0xA0 && f <= 0xAF || f == 0xD8 || f == 0xD9 || f == 0xDA parse_dict(buf) elsif f >= 0xB0 && f <= 0xBF count = buf.next & 0xF # STDERR.puts "Parsing #{count} structures!" marker = buf.next response = {} if marker == BoltMarker::BOLT_SUCCESS response = {:marker => BoltMarker::BOLT_SUCCESS, :data => parse(buf)} elsif marker == BoltMarker::BOLT_FAILURE response = {:marker => BoltMarker::BOLT_FAILURE, :data => parse(buf)} elsif marker == BoltMarker::BOLT_IGNORED response = {:marker => BoltMarker::BOLT_IGNORED} elsif marker == BoltMarker::BOLT_RECORD response = {:marker => BoltMarker::BOLT_RECORD, :data => parse(buf)} elsif marker == BoltMarker::BOLT_NODE response = { :marker => BoltMarker::BOLT_NODE, :id => parse(buf), :labels => parse(buf), :properties => parse(buf), } elsif marker == BoltMarker::BOLT_RELATIONSHIP response = { :marker => BoltMarker::BOLT_RELATIONSHIP, :id => parse(buf), :start_node_id => parse(buf), :end_node_id => parse(buf), :type => parse(buf), :properties => parse(buf), } else raise sprintf("Unknown marker: %02x", marker) end response elsif f == 0xC0 buf.next nil elsif f == 0xC1 buf.next buf.next_float() elsif f == 0xC2 buf.next false elsif f == 0xC3 buf.next true elsif f == 0xC8 buf.next buf.next_int8() elsif f == 0xC9 buf.next buf.next_int16() elsif f == 0xCA buf.next buf.next_int32() elsif f == 0xCB buf.next buf.next_int64() elsif f >= 0xF0 && f <= 0xFF buf.next f - 0x100 elsif f >= 0 && f <= 0x7F buf.next f else raise sprintf("Unknown marker: %02x", f) end end |
#parse_dict(buf) ⇒ Object
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 |
# File 'lib/neo4j_bolt.rb', line 503 def parse_dict(buf) f = buf.next count = 0 if f >= 0xA0 && f <= 0xAF count = f & 0xF elsif f == 0xD8 count = buf.next_uint8() elsif f == 0xD9 count = buf.next_uint16() elsif f == 0xDA count = buf.next_uint32() else raise sprintf("unknown string dict %02x", f) end # STDERR.puts "Parsing dict with #{count} entries" v = {} (0...count).map do key = parse_s(buf) value = parse(buf) # STDERR.puts "#{key.to_s}: #{value.to_s}" v[key] = value end v end |
#parse_list(buf) ⇒ Object
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 |
# File 'lib/neo4j_bolt.rb', line 528 def parse_list(buf) f = buf.next count = 0 if f >= 0x90 && f <= 0x9F count = f & 0x0F elsif f == 0xD4 count = buf.next_uint8() elsif f == 0xD5 count = buf.next_uint16() elsif f == 0xD6 count = buf.next_uint32() else raise sprintf("unknown list format %02x", f) end v = {} (0...count).map do parse(buf) end end |
#parse_s(buf) ⇒ Object
488 489 490 491 492 493 494 495 496 497 498 499 500 501 |
# File 'lib/neo4j_bolt.rb', line 488 def parse_s(buf) f = buf.next if f >= 0x80 && f <= 0x8F buf.next_s(f & 0xF) elsif f == 0xD0 buf.next_s(buf.next_uint8()) elsif f == 0xD1 buf.next_s(buf.next_uint16()) elsif f == 0xD2 buf.next_s(buf.next_uint32()) else raise CypherError.new(sprintf("unknown string format %02x", f), buf) end end |
#read_response(&block) ⇒ Object
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 |
# File 'lib/neo4j_bolt.rb', line 636 def read_response(&block) loop do # STDERR.puts "Reading response:" buffer = BoltBuffer.new(@socket) # buffer.dump response_dict = parse(buffer) buffer.flush() # STDERR.puts "Response marker: #{BOLT_MARKER_LABELS[response_dict[:marker]]}" # STDERR.puts response_dict.to_yaml if response_dict[:marker] == BoltMarker::BOLT_FAILURE # STDERR.puts "RESETTING CONNECTION" append_uint8(0xb0) append_token(BoltMarker::BOLT_RESET) flush() read_response() do |data| if data[:marker] == BoltMarker::BOLT_SUCCESS @state.set(ServerState::READY) else raise UnexpectedServerResponse.new(data[:marker]) end end # BoltBuffer.new(@socket).flush() raise bolt_error(response_dict[:data]['code'], response_dict[:data]['message']) end # STDERR.puts response_dict.to_json yield response_dict if block_given? break if [BoltMarker::BOLT_SUCCESS, BoltMarker::BOLT_FAILURE, BoltMarker::BOLT_IGNORED].include?(response_dict[:marker]) end end |
#reset ⇒ Object
284 285 286 287 288 289 290 291 |
# File 'lib/neo4j_bolt.rb', line 284 def reset() @socket = nil @transaction = 0 @transaction_failed = false @state = State.new() @neo4j_version = nil @pidtid = "#{Process.pid}/#{Thread.current.object_id}" end |
#run_query(query, data = {}, &block) ⇒ Object
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 |
# File 'lib/neo4j_bolt.rb', line 813 def run_query(query, data = {}, &block) if Neo4jBolt.bolt_verbosity >= 1 STDERR.puts query STDERR.puts data.to_json STDERR.puts '-' * 40 end transaction do assert(@state == ServerState::TX_READY || @state == ServerState::TX_STREAMING || @state == ServerState::FAILED) append_uint8(0xb3) append_token(BoltMarker::BOLT_RUN) append_s(query) # Because something might go wrong while filling the buffer with # the request data (for example if the data contains 2^100 anywhere) # we catch any errors that happen here and if things go wrong, we # clear the buffer so that the BOLT_RUN stanza never gets sent # to Neo4j - instead, the transaction gets rolled back begin append_dict(data) rescue @buffer = [] raise end append_dict({}) # options flush() read_response do |data| if data[:marker] == BoltMarker::BOLT_SUCCESS @state.set(ServerState::TX_STREAMING) keys = data[:data]['fields'] assert(@state == ServerState::TX_STREAMING) append_uint8(0xb1) append_token(BoltMarker::BOLT_PULL) append_dict({:n => -1}) flush() read_response do |data| if data[:marker] == BoltMarker::BOLT_RECORD entry = {} keys.each.with_index do |key, i| entry[key] = fix_value(data[:data][i]) end if Neo4jBolt.bolt_verbosity >= 1 STDERR.puts ">>> #{entry.to_json}" STDERR.puts '-' * 40 end yield entry elsif data[:marker] == BoltMarker::BOLT_SUCCESS # STDERR.puts data.to_yaml @state.set(ServerState::TX_READY) else raise UnexpectedServerResponse.new(data[:marker]) end end elsif data[:marker] == BoltMarker::BOLT_FAILURE @state.set(ServerState::FAILED) else raise UnexpectedServerResponse.new(data[:marker]) end end end end |
#setup_constraints_and_indexes(constraints, indexes) ⇒ Object
893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 |
# File 'lib/neo4j_bolt.rb', line 893 def setup_constraints_and_indexes(constraints, indexes) wanted_constraints = Set.new() wanted_indexes = Set.new() # STDERR.puts "Setting up constraints and indexes..." constraints.each do |constraint| unless constraint =~ /\w+\/\w+/ raise "Unexpected constraint format: #{constraint}" end constraint_name = "#{CONSTRAINT_INDEX_PREFIX}#{constraint.gsub('/', '_')}" wanted_constraints << constraint_name label = constraint.split('/').first property = constraint.split('/').last query = "CREATE CONSTRAINT #{constraint_name} IF NOT EXISTS FOR (n:#{label}) REQUIRE n.#{property} IS UNIQUE" # STDERR.puts query neo4j_query(query) end indexes.each do |index| unless index =~ /\w+\/\w+/ raise "Unexpected index format: #{index}" end index_name = "#{CONSTRAINT_INDEX_PREFIX}#{index.gsub('/', '_')}" wanted_indexes << index_name label = index.split('/').first property = index.split('/').last query = "CREATE INDEX #{index_name} IF NOT EXISTS FOR (n:#{label}) ON (n.#{property})" # STDERR.puts query neo4j_query(query) end neo4j_query("SHOW ALL CONSTRAINTS").each do |row| next unless row['name'].index(CONSTRAINT_INDEX_PREFIX) == 0 next if wanted_constraints.include?(row['name']) query = "DROP CONSTRAINT #{row['name']}" # STDERR.puts query neo4j_query(query) end neo4j_query("SHOW ALL INDEXES").each do |row| next unless row['name'].index(CONSTRAINT_INDEX_PREFIX) == 0 next if wanted_indexes.include?(row['name']) || wanted_constraints.include?(row['name']) query = "DROP INDEX #{row['name']}" # STDERR.puts query neo4j_query(query) end end |
#transaction(&block) ⇒ Object
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 |
# File 'lib/neo4j_bolt.rb', line 729 def transaction(&block) reset() if @pidtid != "#{Process.pid}/#{Thread.current.object_id}" connect() if @socket.nil? if @transaction == 0 # STDERR.puts '*' * 40 # STDERR.puts "#{SERVER_STATE_LABELS[@state.to_i]} (#{@state.to_i})" # STDERR.puts '*' * 40 assert(@state == ServerState::READY) append_uint8(0xb1) append_token(BoltMarker::BOLT_BEGIN) append_dict({}) flush() read_response() do |data| if data[:marker] == BoltMarker::BOLT_SUCCESS @state.set(ServerState::TX_READY) @transaction_failed = false elsif data[:marker] == BoltMarker::BOLT_FAILURE @state.set(ServerState::FAILED) else raise UnexpectedServerResponse.new(data[:marker]) end end end @transaction += 1 begin yield rescue @transaction_failed = true raise ensure @transaction -= 1 if @transaction == 0 && @transaction_failed # TODO: Not sure about this, read remaining response but don't block # read_response() # STDERR.puts "!!! Rolling back transaction !!! --- state is #{@state}" if @state == ServerState::TX_READY assert(@state == ServerState::TX_READY) append_uint8(0xb0) append_token(BoltMarker::BOLT_ROLLBACK) flush() read_response do |data| if data[:marker] == BoltMarker::BOLT_SUCCESS @state.set(ServerState::READY) elsif data[:marker] == BoltMarker::BOLT_FAILURE @state.set(ServerState::FAILED) else raise UnexpectedServerResponse.new(data[:marker]) end end end end end if (@transaction == 0) && (!@transaction_failed) append_uint8(0xb0) append_token(BoltMarker::BOLT_COMMIT) flush() read_response() do |data| if data[:marker] == BoltMarker::BOLT_SUCCESS @transaction = 0 @transaction_failed = false @state.set(ServerState::READY) else raise UnexpectedServerResponse.new(data[:marker]) end end end end |