Class: Volt::MessageBus::PeerConnection
- Defined in:
- lib/volt/server/message_bus/peer_to_peer/peer_connection.rb
Constant Summary collapse
- CONNECT_TIMEOUT =
2
Instance Attribute Summary collapse
-
#peer_server_id ⇒ Object
readonly
The server id for the connected server.
-
#socket ⇒ Object
readonly
The server id for the connected server.
Instance Method Summary collapse
-
#announce ⇒ Object
Tells the other connect its server_id.
-
#disconnect! ⇒ Object
Close the socket, kill listener thread, wait for worker thread to send all messages, and remove from message_bus’s peer_connections.
-
#initialize(socket, ips, port, message_bus, server = false, peer_server_id = nil) ⇒ PeerConnection
constructor
A new instance of PeerConnection.
- #listen ⇒ Object
- #publish(message) ⇒ Object
- #run_worker ⇒ Object
Constructor Details
#initialize(socket, ips, port, message_bus, server = false, peer_server_id = nil) ⇒ PeerConnection
Returns a new instance of PeerConnection.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 17 def initialize(socket, ips, port, , server=false, peer_server_id=nil) @message_bus = @ips = ips @port = port @server = server @socket = socket @server_id = .server_id @peer_server_id = peer_server_id @message_queue = SizedQueue.new(500) @reconnect_mutex = Mutex.new # The encoder handles things like formatting and encryption @message_encoder = MessageEncoder.new @worker_thread = Thread.new do # Connect to the remote if this PeerConnection was created from the # active_volt_instances collection. # # reconnect! will setup the @socket if @socket || reconnect! # Announce checks to make sure we didn't connect to ourselves if announce # Setp the listen thread. @listen_thread = Thread.new do # Listen for messages in a new thread listen end run_worker end end end end |
Instance Attribute Details
#peer_server_id ⇒ Object (readonly)
The server id for the connected server
15 16 17 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 15 def peer_server_id @peer_server_id end |
#socket ⇒ Object (readonly)
The server id for the connected server
15 16 17 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 15 def socket @socket end |
Instance Method Details
#announce ⇒ Object
Tells the other connect its server_id. In the event we connected to ourself, close.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 54 def announce failed = false begin if @server # Wait for announcement @peer_server_id = @message_encoder.(@socket) @message_encoder.(@socket, @server_id) else # Announce @message_encoder.(@socket, @server_id) @peer_server_id = @message_encoder.(@socket) end rescue IOError => e failed = true end # Make sure we aren't already connected @message_bus.remove_duplicate_connections # Don't connect to self if failed || @peer_server_id == @server_id # Close the connection disconnect! return false end # Success return true end |
#disconnect! ⇒ Object
Close the socket, kill listener thread, wait for worker thread to send all messages, and remove from message_bus’s peer_connections.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 86 def disconnect! @disconnected = true @message_queue.push(:QUIT) begin @socket.close rescue => e # Ignore close error, since we may not be connected end @listen_thread.kill if @listen_thread # @worker_thread.kill # Wait for the worker to publish all messages @worker_thread.join if Thread.current != @worker_thread && @worker_thread @message_bus.remove_peer_connection(self) end |
#listen ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 126 def listen loop do begin while ( = @message_encoder.(@socket)) break if @disconnected @message_bus.() end # Got nil from socket rescue Errno::ECONNRESET, Errno::ENETUNREACH, Errno::EPIPE, IOError => e # handle below end if !@disconnected && !@server # Connection was dropped, try to reconnect connected = reconnect! # Couldn't reconnect, die break unless connected else break end end end |
#publish(message) ⇒ Object
104 105 106 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 104 def publish() @message_queue.push() end |
#run_worker ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 108 def run_worker while ( = @message_queue.pop) break if == :QUIT begin @message_encoder.(@socket, ) # 'Error: closed stream' comes in sometimes rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EPIPE, IOError => e # was also rescuing Error if reconnect! retry else # Unable to reconnect, die break end end end end |