Class: Async::IO::Socket
- Inherits:
-
BasicSocket
- Object
- Wrapper
- Generic
- BasicSocket
- Async::IO::Socket
- Includes:
- Server
- Defined in:
- lib/async/io/socket.rb
Constant Summary
Constants inherited from Generic
Instance Attribute Summary
Attributes inherited from Generic
Class Method Summary collapse
-
.accept(*args, backlog: SOMAXCONN, &block) ⇒ Object
Bind to a local address and accept connections in a loop.
-
.bind(local_address, protocol: 0, task: Task.current, **options, &block) ⇒ Object
Bind to a local address.
-
.build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) ⇒ Object
Build and wrap the underlying io.
-
.connect(remote_address, local_address: nil, task: Task.current, **options) ⇒ Object
Establish a connection to a given ‘remote_address`.
- .pair(*args) ⇒ Object
Instance Method Summary collapse
- #accept(timeout: nil, task: Task.current) ⇒ Object
- #accept_nonblock ⇒ Object
- #connect(*args) ⇒ Object
- #connect_nonblock ⇒ Object
- #sysaccept ⇒ Object
Methods included from Server
Methods included from Peer
#connected?, #eof, #eof?, #protocol, #sync, #sync=, #type
Methods inherited from Generic
#<<, #connected?, #dup, #nonblock, #nonblock=, #nonblock?, #read, #sysread, #syswrite, #wait, wrap, wrap_blocking_method, wraps, #write
Class Method Details
permalink .accept(*args, backlog: SOMAXCONN, &block) ⇒ Object
Bind to a local address and accept connections in a loop.
177 178 179 180 181 182 183 |
# File 'lib/async/io/socket.rb', line 177 def self.accept(*args, backlog: SOMAXCONN, &block) bind(*args) do |server, task| server.listen(backlog) if backlog server.accept_each(task: task, &block) end end |
permalink .bind(local_address, protocol: 0, task: Task.current, **options, &block) ⇒ Object
Bind to a local address.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/async/io/socket.rb', line 156 def self.bind(local_address, protocol: 0, task: Task.current, **, &block) Console.logger.debug(self) {"Binding to #{local_address.inspect}"} wrapper = build(local_address.afamily, local_address.socktype, protocol, **) do |socket| socket.bind(local_address.to_sockaddr) end return wrapper unless block_given? task.async do |task| task.annotate "binding to #{wrapper.local_address.inspect}" begin yield wrapper, task ensure wrapper.close end end end |
permalink .build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) ⇒ Object
Build and wrap the underlying io.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/async/io/socket.rb', line 86 def self.build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) socket = wrapped_klass.new(*args) if reuse_address socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) end if reuse_port socket.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1) end if linger socket.setsockopt(SOL_SOCKET, SO_LINGER, linger) end yield socket wrapper = self.new(socket, task.reactor) wrapper.timeout = timeout return wrapper rescue Exception socket.close if socket raise end |
permalink .connect(remote_address, local_address: nil, task: Task.current, **options) ⇒ Object
Establish a connection to a given ‘remote_address`.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/async/io/socket.rb', line 118 def self.connect(remote_address, local_address: nil, task: Task.current, **) Console.logger.debug(self) {"Connecting to #{remote_address.inspect}"} task.annotate "connecting to #{remote_address.inspect}" wrapper = build(remote_address.afamily, remote_address.socktype, remote_address.protocol, **) do |socket| if local_address if defined?(IP_BIND_ADDRESS_NO_PORT) # Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique. socket.setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1) end socket.bind(local_address.to_sockaddr) end end begin wrapper.connect(remote_address.to_sockaddr) task.annotate "connected to #{remote_address.inspect} [fd=#{wrapper.fileno}]" rescue Exception wrapper.close raise end return wrapper unless block_given? begin yield wrapper, task ensure wrapper.close end end |
permalink .pair(*args) ⇒ Object
[View source]
187 188 189 |
# File 'lib/async/io/socket.rb', line 187 def self.pair(*args) ::Socket.pair(*args).map(&self.method(:new)) end |
Instance Method Details
permalink #accept(timeout: nil, task: Task.current) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/async/io/socket.rb', line 61 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end |
permalink #accept_nonblock ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/async/io/socket.rb', line 80 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end |
permalink #connect(*args) ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/async/io/socket.rb', line 50 def connect(*args) begin async_send(:connect_nonblock, *args) rescue Errno::EISCONN # We are now connected. end end |
permalink #connect_nonblock ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/async/io/socket.rb', line 58 def connect(*args) begin async_send(:connect_nonblock, *args) rescue Errno::EISCONN # We are now connected. end end |
permalink #sysaccept ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/async/io/socket.rb', line 81 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end |