Class: Chan::UNIXSocket
Overview
The Chan::UNIXSocket class implements a channel for interprocess communication (IPC) using an unnamed UNIXSocket.
Write methods collapse
-
#send(object) ⇒ Object
(also: #write)
Performs a blocking write.
-
#send_nonblock(object) ⇒ Integer?
(also: #write_nonblock)
Performs a non-blocking write.
Read methods collapse
-
#recv ⇒ Object
(also: #read)
Performs a blocking read.
-
#recv_nonblock ⇒ Object
(also: #read_nonblock)
Performs a non-blocking read.
Size methods collapse
-
#bytes_received ⇒ Integer
(also: #bytes_read)
Returns the total number of bytes read from the channel.
-
#bytes_sent ⇒ Integer
(also: #bytes_written)
Returns the total number of bytes written to the channel.
-
#size ⇒ Integer
Returns the number of objects waiting to be read.
Wait methods collapse
-
#wait_readable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable.
-
#wait_writable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable.
Socket options collapse
-
#getsockopt(target, level, option_name) ⇒ Socket::Option
Returns an instance of
Socket::Option
. -
#setsockopt(target, level, option_name, option_value) ⇒ Integer
Returns 0 on success.
Instance Method Summary collapse
-
#close ⇒ void
Closes the channel.
-
#closed? ⇒ Boolean
Returns true when the channel is closed.
-
#empty? ⇒ Boolean
Returns true when the channel is empty.
-
#initialize(serializer, tmpdir: Dir.tmpdir, sock_type: Socket::SOCK_DGRAM) ⇒ Chan::UNIXSocket
constructor
Returns an instance of Chan::UNIXSocket.
-
#serializer ⇒ <#dump, #load>
Returns the serializer used by the channel.
-
#to_a ⇒ Array<Object>
Returns the consumed contents of the channel.
Constructor Details
#initialize(serializer, tmpdir: Dir.tmpdir, sock_type: Socket::SOCK_DGRAM) ⇒ Chan::UNIXSocket
Returns an instance of Chan::UNIXSocket.
29 30 31 32 33 34 |
# File 'lib/xchan/unix_socket.rb', line 29 def initialize(serializer, tmpdir: Dir.tmpdir, sock_type: Socket::SOCK_DGRAM) @serializer = Chan.serializers[serializer]&.call || serializer @r, @w = ::UNIXSocket.pair(sock_type) @bytes = Chan::Bytes.new(tmpdir) @lock = LockFile.new Chan.temporary_file("xchan.lock", tmpdir:) end |
Instance Method Details
#bytes_received ⇒ Integer Also known as: bytes_read
Returns the total number of bytes read from the channel.
213 214 215 |
# File 'lib/xchan/unix_socket.rb', line 213 def bytes_received lock { @bytes.stat.bytes_read } end |
#bytes_sent ⇒ Integer Also known as: bytes_written
Returns the total number of bytes written to the channel.
205 206 207 |
# File 'lib/xchan/unix_socket.rb', line 205 def bytes_sent lock { @bytes.stat.bytes_written } end |
#close ⇒ void
This method returns an undefined value.
Closes the channel.
57 58 59 60 61 62 63 64 |
# File 'lib/xchan/unix_socket.rb', line 57 def close @lock.lock raise IOError, "channel is closed" if closed? [@r, @w, @bytes, @lock.file].each(&:close) rescue IOError => ex @lock.release raise(ex) end |
#closed? ⇒ Boolean
Returns true when the channel is closed.
46 47 48 |
# File 'lib/xchan/unix_socket.rb', line 46 def closed? @r.closed? and @w.closed? end |
#empty? ⇒ Boolean
Returns true when the channel is empty.
194 195 196 197 |
# File 'lib/xchan/unix_socket.rb', line 194 def empty? return true if closed? lock { size.zero? } end |
#getsockopt(target, level, option_name) ⇒ Socket::Option
Returns an instance of Socket::Option
.
299 300 301 302 303 304 305 306 307 308 |
# File 'lib/xchan/unix_socket.rb', line 299 def getsockopt(target, level, option_name) @lock.lock if !%w[reader writer].include?(target.to_s) raise ArgumentError, "target can be ':reader', or ':writer'" end target = (target == :reader) ? @r : @w target.getsockopt(level, option_name) ensure @lock.release end |
#recv ⇒ Object Also known as: read
Performs a blocking read
132 133 134 135 136 137 138 139 |
# File 'lib/xchan/unix_socket.rb', line 132 def recv recv_nonblock rescue Chan::WaitReadable wait_readable retry rescue Chan::WaitLockable retry end |
#recv_nonblock ⇒ Object Also known as: read_nonblock
Performs a non-blocking read
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/xchan/unix_socket.rb', line 156 def recv_nonblock @lock.lock_nonblock raise IOError, "closed channel" if closed? len = @bytes.shift obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len)) obj.tap { @lock.release } rescue IOError => ex @lock.release raise(ex) rescue IO::WaitReadable => ex @bytes.unshift(len) @lock.release raise Chan::WaitReadable, ex. rescue Errno::EAGAIN => ex raise Chan::WaitLockable, ex. end |
#send(object) ⇒ Object Also known as: write
Performs a blocking write
80 81 82 83 84 |
# File 'lib/xchan/unix_socket.rb', line 80 def send(object) send_nonblock(object) rescue Chan::WaitWritable, Chan::WaitLockable retry end |
#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock
Performs a non-blocking write
104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/xchan/unix_socket.rb', line 104 def send_nonblock(object) @lock.lock_nonblock raise IOError, "channel closed" if closed? len = @w.write_nonblock(serialize(object)) @bytes.push(len) len.tap { @lock.release } rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex @lock.release raise Chan::WaitWritable, ex. rescue Errno::EWOULDBLOCK => ex raise Chan::WaitLockable, ex. end |
#serializer ⇒ <#dump, #load>
Returns the serializer used by the channel.
39 40 41 |
# File 'lib/xchan/unix_socket.rb', line 39 def serializer @serializer end |
#setsockopt(target, level, option_name, option_value) ⇒ Integer
Returns 0 on success.
276 277 278 279 280 281 282 283 284 285 |
# File 'lib/xchan/unix_socket.rb', line 276 def setsockopt(target, level, option_name, option_value) @lock.lock if !%w[reader writer].include?(target.to_s) raise ArgumentError, "target can be ':reader', or ':writer'" end target = (target == :reader) ? @r : @w target.setsockopt(level, option_name, option_value) ensure @lock.release end |
#size ⇒ Integer
Returns the number of objects waiting to be read.
221 222 223 |
# File 'lib/xchan/unix_socket.rb', line 221 def size lock { @bytes.size } end |
#to_a ⇒ Array<Object>
Returns the consumed contents of the channel.
185 186 187 188 189 |
# File 'lib/xchan/unix_socket.rb', line 185 def to_a lock do [].tap { _1.push(recv) until empty? } end end |
#wait_readable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable.
239 240 241 |
# File 'lib/xchan/unix_socket.rb', line 239 def wait_readable(s = nil) @r.wait_readable(s) and self end |
#wait_writable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable.
251 252 253 |
# File 'lib/xchan/unix_socket.rb', line 251 def wait_writable(s = nil) @w.wait_writable(s) and self end |