Class: Chan::UNIXSocket

Inherits:
Object show all
Defined in:
lib/xchan/unix_socket.rb

Overview

The Chan::UNIXSocket class implements a channel for interprocess communication (IPC) using an unnamed UNIXSocket.

Write methods collapse

Read methods collapse

Size methods collapse

Wait methods collapse

Socket options collapse

Instance Method Summary collapse

Constructor Details

#initialize(serializer, tmpdir: Dir.tmpdir, sock_type: Socket::SOCK_DGRAM) ⇒ Chan::UNIXSocket

Returns an instance of Chan::UNIXSocket.

Examples:

ch = Chan::UNIXSocket.new(:marshal)
ch.send([1,2,3])
ch.recv.pop # => 3
ch.close

Parameters:

  • serializer (Symbol, <#dump, #load>)

    A serializer.

  • sock_type (Integer) (defaults to: Socket::SOCK_DGRAM)

    A socket type (eg Socket::SOCK_STREAM).

  • tmpdir (String) (defaults to: Dir.tmpdir)

    Path to a directory where temporary files will be stored.



29
30
31
32
33
34
# File 'lib/xchan/unix_socket.rb', line 29

def initialize(serializer, tmpdir: Dir.tmpdir, sock_type: Socket::SOCK_DGRAM)
  @serializer = Chan.serializers[serializer]&.call || serializer
  @r, @w = ::UNIXSocket.pair(sock_type)
  @bytes = Chan::Bytes.new(tmpdir)
  @lock = LockFile.new Chan.temporary_file("xchan.lock", tmpdir:)
end

Instance Method Details

#bytes_receivedInteger Also known as: bytes_read

Returns the total number of bytes read from the channel.

Returns:

  • (Integer)

    Returns the total number of bytes read from the channel.



213
214
215
# File 'lib/xchan/unix_socket.rb', line 213

def bytes_received
  lock { @bytes.stat.bytes_read }
end

#bytes_sentInteger Also known as: bytes_written

Returns the total number of bytes written to the channel.

Returns:

  • (Integer)

    Returns the total number of bytes written to the channel.



205
206
207
# File 'lib/xchan/unix_socket.rb', line 205

def bytes_sent
  lock { @bytes.stat.bytes_written }
end

#closevoid

This method returns an undefined value.

Closes the channel.

Raises:

  • (IOError)

    When the channel is closed.



57
58
59
60
61
62
63
64
# File 'lib/xchan/unix_socket.rb', line 57

def close
  @lock.lock
  raise IOError, "channel is closed" if closed?
  [@r, @w, @bytes, @lock.file].each(&:close)
rescue IOError => ex
  @lock.release
  raise(ex)
end

#closed?Boolean

Returns true when the channel is closed.

Returns:

  • (Boolean)

    Returns true when the channel is closed.



46
47
48
# File 'lib/xchan/unix_socket.rb', line 46

def closed?
  @r.closed? and @w.closed?
end

#empty?Boolean

Returns true when the channel is empty.

Returns:

  • (Boolean)

    Returns true when the channel is empty.



194
195
196
197
# File 'lib/xchan/unix_socket.rb', line 194

def empty?
  return true if closed?
  lock { size.zero? }
end

#getsockopt(target, level, option_name) ⇒ Socket::Option

Returns an instance of Socket::Option.

Parameters:

  • target (String, Symbol)

    :reader, or :writer.

  • level (Integer)

    The level (eg Socket::SOL_SOCKET for the socket level).

  • option_name (Integer)

    The name of an option (eg Socket::SO_RCVBUF).

Returns:

  • (Socket::Option)

    Returns an instance of Socket::Option.



299
300
301
302
303
304
305
306
307
308
# File 'lib/xchan/unix_socket.rb', line 299

def getsockopt(target, level, option_name)
  @lock.lock
  if !%w[reader writer].include?(target.to_s)
    raise ArgumentError, "target can be ':reader', or ':writer'"
  end
  target = (target == :reader) ? @r : @w
  target.getsockopt(level, option_name)
ensure
  @lock.release
end

#recvObject Also known as: read

Performs a blocking read

Returns:

  • (Object)

    Returns an object from the channel.

Raises:

  • (IOError)

    When the channel is closed.



132
133
134
135
136
137
138
139
# File 'lib/xchan/unix_socket.rb', line 132

def recv
  recv_nonblock
rescue Chan::WaitReadable
  wait_readable
  retry
rescue Chan::WaitLockable
  retry
end

#recv_nonblockObject Also known as: read_nonblock

Performs a non-blocking read

Returns:

  • (Object)

    Returns an object from the channel.

Raises:

  • (IOError)

    When the channel is closed.

  • (Chan::WaitReadable)

    When a read from the underlying IO blocks.

  • (Chan::WaitLockable)

    When a read blocks because of a lock held by another process.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/xchan/unix_socket.rb', line 156

def recv_nonblock
  @lock.lock_nonblock
  raise IOError, "closed channel" if closed?
  len = @bytes.shift
  obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len))
  obj.tap { @lock.release }
rescue IOError => ex
  @lock.release
  raise(ex)
rescue IO::WaitReadable => ex
  @bytes.unshift(len)
  @lock.release
  raise Chan::WaitReadable, ex.message
rescue Errno::EAGAIN => ex
  raise Chan::WaitLockable, ex.message
end

#send(object) ⇒ Object Also known as: write

Performs a blocking write

Parameters:

  • object (Object)

    An object to write to the channel.

Returns:

  • (Object)

    Returns the number of bytes written to the channel.

Raises:

  • (IOError)

    When the channel is closed.



80
81
82
83
84
# File 'lib/xchan/unix_socket.rb', line 80

def send(object)
  send_nonblock(object)
rescue Chan::WaitWritable, Chan::WaitLockable
  retry
end

#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock

Performs a non-blocking write

Parameters:

  • object (Object)

    An object to write to the channel.

Returns:

  • (Integer, nil)

    Returns the number of bytes written to the channel.

Raises:

  • (IOError)

    When the channel is closed.

  • (Chan::WaitWritable)

    When a write to the underlying IO blocks.

  • (Chan::WaitLockable)

    When a write blocks because of a lock held by another process.



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/xchan/unix_socket.rb', line 104

def send_nonblock(object)
  @lock.lock_nonblock
  raise IOError, "channel closed" if closed?
  len = @w.write_nonblock(serialize(object))
  @bytes.push(len)
  len.tap { @lock.release }
rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex
  @lock.release
  raise Chan::WaitWritable, ex.message
rescue Errno::EWOULDBLOCK => ex
  raise Chan::WaitLockable, ex.message
end

#serializer<#dump, #load>

Returns the serializer used by the channel.

Returns:

  • (<#dump, #load>)

    Returns the serializer used by the channel.



39
40
41
# File 'lib/xchan/unix_socket.rb', line 39

def serializer
  @serializer
end

#setsockopt(target, level, option_name, option_value) ⇒ Integer

Returns 0 on success.

Parameters:

  • target (String, Symbol)

    :reader, or :writer.

  • level (Integer)

    The level (eg Socket::SOL_SOCKET for the socket level).

  • option_name (Integer)

    The name of an option (eg Socket::SO_RCVBUF).

  • option_value (Boolean, Integer)

    The option value (eg 12345)

Returns:

  • (Integer)

    Returns 0 on success.



276
277
278
279
280
281
282
283
284
285
# File 'lib/xchan/unix_socket.rb', line 276

def setsockopt(target, level, option_name, option_value)
  @lock.lock
  if !%w[reader writer].include?(target.to_s)
    raise ArgumentError, "target can be ':reader', or ':writer'"
  end
  target = (target == :reader) ? @r : @w
  target.setsockopt(level, option_name, option_value)
ensure
  @lock.release
end

#sizeInteger

Returns the number of objects waiting to be read.

Returns:

  • (Integer)

    Returns the number of objects waiting to be read.



221
222
223
# File 'lib/xchan/unix_socket.rb', line 221

def size
  lock { @bytes.size }
end

#to_aArray<Object>

Returns the consumed contents of the channel.

Examples:

ch = xchan
1.upto(4) { ch.send(_1) }
ch.to_a.last # => 4

Returns:

  • (Array<Object>)

    Returns the consumed contents of the channel.



185
186
187
188
189
# File 'lib/xchan/unix_socket.rb', line 185

def to_a
  lock do
    [].tap { _1.push(recv) until empty? }
  end
end

#wait_readable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become readable.

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely when "nil".

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is readable, otherwise returns nil.



239
240
241
# File 'lib/xchan/unix_socket.rb', line 239

def wait_readable(s = nil)
  @r.wait_readable(s) and self
end

#wait_writable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become writable.

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely when "nil".

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is writable, otherwise returns nil.



251
252
253
# File 'lib/xchan/unix_socket.rb', line 251

def wait_writable(s = nil)
  @w.wait_writable(s) and self
end