Module: XS::CommonSocketBehavior

Included in:
Socket
Defined in:
lib/ffi-rxs/socket.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/ffi-rxs/socket.rb', line 7

def name
  @name
end

#socketObject (readonly)

Returns the value of attribute socket.



7
8
9
# File 'lib/ffi-rxs/socket.rb', line 7

def socket
  @socket
end

Class Method Details

.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket

Allocates a socket of type type for sending and receiving data.

By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.

Examples:

Socket creation

sock = Socket.create(Context.create, XS::REQ, :receiver_class => XS::ManagedMessage)
if (socket = Socket.new(context.pointer, XS::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end

Parameters:

  • pointer
  • type (Constant)

    One of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER

  • options (Hash)

Returns:

  • (Socket)

    when successful

  • nil when unsuccessful



39
40
41
# File 'lib/ffi-rxs/socket.rb', line 39

def self.create context_ptr, type, opts = {:receiver_class => XS::Message}
  new(context_ptr, type, opts) rescue nil
end

Instance Method Details

#bind(address) ⇒ Object

Binds the socket to an address.

Examples:

socket.bind("tcp://127.0.0.1:5555")

Parameters:

  • address

Returns:

  • 0 or greater if successful

  • (0 if unsuccessful)

    0 if unsuccessful



189
190
191
# File 'lib/ffi-rxs/socket.rb', line 189

def bind address
  LibXS.xs_bind @socket, address
end

#closeObject

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option XS::LINGER.

Examples:

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc

Returns:

  • 0 upon success or when the socket has already been closed

  • -1 when the operation fails. Check XS.errno for the error code



215
216
217
218
219
220
221
222
223
224
225
# File 'lib/ffi-rxs/socket.rb', line 215

def close
  if @socket
    remove_finalizer
    rc = LibXS.xs_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end

#connect(address) ⇒ Object

Connects the socket to an address.

Examples:

rc = socket.connect("tcp://127.0.0.1:5555")

Parameters:

  • address

Returns:

  • 0 or greater if successful

  • (0 if unsuccessful)

    0 if unsuccessful



202
203
204
# File 'lib/ffi-rxs/socket.rb', line 202

def connect address
  LibXS.xs_connect @socket, address
end

#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket

Allocates a socket of type type for sending and receiving data.

To avoid rescuing exceptions, use the factory method #create for all socket creation.

By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the Crossroads socket within the context fails.

Examples:

Socket creation

sock = Socket.new(Context.new, XS::REQ, :receiver_class => XS::ManagedMessage)
begin
  socket = Socket.new(context.pointer, XS::REQ)
rescue ContextError => e
  # error handling
end

Parameters:

  • pointer
  • type (Constant)

    One of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER

  • options (Hash)

Returns:

  • (Socket)

    when successful

  • nil when unsuccessful



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ffi-rxs/socket.rb', line 79

def initialize context_ptr, type, opts = {:receiver_class => XS::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as XS::Message
  @receiver_klass = opts[:receiver_class]

  context_ptr = context_ptr.pointer if context_ptr.kind_of?(XS::Context)

  unless context_ptr.null?
    @socket = LibXS.xs_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'xs_socket', 0, ETERM, "Socket pointer was null"
    end
  else
    raise ContextError.new 'xs_socket', 0, ETERM, "Context pointer was null"
  end

  @longlong_cache = @int_cache = nil
  @more_parts_array = []
  @option_lookup = []
  populate_option_lookup

  define_finalizer
end

#more_parts?Boolean

Convenience method for checking on additional message parts.

Equivalent to calling Socket#getsockopt with XS::RCVMORE.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

Examples:

message_parts = []
message = Message.new
rc = socket.recvmsg(message)
if XS::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recvmsg(message)
    message_parts.push(message) if resultcode_ok?(rc)
  end
end

Returns:

  • (Boolean)

    true if more message parts

  • false if not



174
175
176
177
178
# File 'lib/ffi-rxs/socket.rb', line 174

def more_parts?
  rc = getsockopt XS::RCVMORE, @more_parts_array

  Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end

#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object

Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.

Parameters:

  • list (Array)
  • routing_envelope
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 if successful

  • -1 if unsuccessful



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/ffi-rxs/socket.rb', line 472

def recv_multipart list, routing_envelope, flag = 0
  parts = []
  rc = recvmsgs parts, flag

  if Util.resultcode_ok?(rc)
    routing = true
    parts.each do |part|
      if routing
        routing_envelope << part
        routing = part.size > 0
      else
        list << part
      end
    end
  end

  rc
end

#recv_string(string, flag = 0) ⇒ Object

Helper method to make a new #Message instance and convert its payload to a string.

With a -1 return code, the user must check XS.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.

Parameters:

  • string
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 when the message was successfully dequeued

  • -1 under two conditions 1. The message could not be dequeued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN



389
390
391
392
393
394
395
# File 'lib/ffi-rxs/socket.rb', line 389

def recv_string string, flag = 0
  message = @receiver_klass.new
  rc = recvmsg message, flag
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end

#recv_strings(list, flag = 0) ⇒ Object

Receive a multipart message as a list of strings.

Parameters:

  • list (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking. Any other flag will be removed.

Returns:

  • 0 if successful

  • -1 if unsuccessful



406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/ffi-rxs/socket.rb', line 406

def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag

  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end

  rc
end

#recvmsg(message, flag = 0) ⇒ Object

Dequeues a message from the underlying queue. By default, this is a blocking operation.

With a -1 return code, the user must check XS.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default) - blocking operation and XS::NonBlocking - non-blocking operation

Returns:

  • 0 when the message was successfully dequeued

  • -1 under two conditions 1. The message could not be dequeued 2. When flags is set with XS::NonBlocking and the socket returned EAGAIN



368
369
370
# File 'lib/ffi-rxs/socket.rb', line 368

def recvmsg message, flag = 0
  __recvmsg__(@socket, message.address, flag)
end

#recvmsgs(list, flag = 0) ⇒ Object

Receive a multipart message as an array of objects (by default these are instances of Message).

Parameters:

  • list (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking. Any other flag will be removed.

Returns:

  • 0 if successful

  • -1 if unsuccessful



430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/ffi-rxs/socket.rb', line 430

def recvmsgs list, flag = 0
  flag = NonBlocking if dontwait?(flag)

  message = @receiver_klass.new
  rc = recvmsg message, flag

  if Util.resultcode_ok?(rc)
    list << message

    # check rc *first*; necessary because the call to #more_parts? can reset
    # the xs_errno to a weird value, so the xs_errno that was set on the
    # call to #recv gets lost
    while Util.resultcode_ok?(rc) && more_parts?
      message = @receiver_klass.new
      rc = recvmsg message, flag

      if Util.resultcode_ok?(rc)
        list << message
      else
        message.close
        list.each { |msg| msg.close }
        list.clear
      end
    end
  else
    message.close
  end

  rc
end

#send_and_close(message, flag = 0) ⇒ Object

Sends a message. This will automatically close the message for both successful and failed sends.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default) and @XS::NonBlocking

Returns:

  • 0 when the message was successfully enqueued

  • -1 under two conditions 1. The message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



333
334
335
336
337
# File 'lib/ffi-rxs/socket.rb', line 333

def send_and_close message, flag = 0
  rc = sendmsg message, flag
  message.close
  rc
end

#send_string(string, flag = 0) ⇒ Object

Helper method to make a new #Message instance out of the string passed in for transmission.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default), XS::NonBlocking and XS::SNDMORE

Returns:

  • 0 when the message was successfully enqueued

  • -1 under two conditions 1. The message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



260
261
262
263
# File 'lib/ffi-rxs/socket.rb', line 260

def send_string string, flag = 0
  message = Message.new string
  send_and_close message, flag
end

#send_strings(parts, flag = 0) ⇒ Object

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • parts (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 when the messages were successfully enqueued

  • -1 under two conditions 1. A message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



280
281
282
283
284
285
286
287
288
289
290
# File 'lib/ffi-rxs/socket.rb', line 280

def send_strings parts, flag = 0
  return -1 if !parts || parts.empty?
  flag = NonBlocking if dontwait?(flag)

  parts[0..-2].each do |part|
    rc = send_string part, (flag | XS::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  send_string parts[-1], flag
end

#sendmsg(message, flag = 0) ⇒ Object

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default) - blocking operation, XS::NonBlocking - non-blocking operation, XS::SNDMORE - this message is part of a multi-part message

Returns:

  • 0 when the message was successfully enqueued

  • -1 under two conditions 1. The message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



242
243
244
# File 'lib/ffi-rxs/socket.rb', line 242

def sendmsg message, flag = 0
  __sendmsg__(@socket, message.address, flag)
end

#sendmsgs(parts, flag = 0) ⇒ Object

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • parts (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 when the messages were successfully enqueued

  • -1 under two conditions 1. A message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN



307
308
309
310
311
312
313
314
315
316
317
# File 'lib/ffi-rxs/socket.rb', line 307

def sendmsgs parts, flag = 0
  return -1 if !parts || parts.empty?
  flag = NonBlocking if dontwait?(flag)

  parts[0..-2].each do |part|
    rc = sendmsg part, (flag | XS::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  sendmsg parts[-1], flag
end

#setsockopt(name, value, length = nil) ⇒ Object

Set the queue options on this socket

With a -1 return code, the user must check XS.errno to determine the cause.

Examples:

rc = socket.setsockopt(XS::LINGER, 1_000)
XS::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")

Parameters:

  • name (Constant)

    numeric values One of XS::AFFINITY, XS::RATE, XS::RECOVERY_IVL, XS::LINGER, XS::RECONNECT_IVL, XS::BACKLOG, XS::RECONNECT_IVL_MAX, XS::MAXMSGSIZE, XS::SNDHWM, XS::RCVHWM, XS::MULTICAST_HOPS, XS::RCVTIMEO, XS::SNDTIMEO, XS::IPV4ONLY, XS::KEEPALIVE, XS::SUBSCRIBE, XS::UNSUBSCRIBE, XS::IDENTITY, XS::SNDBUF, XS::RCVBUF

  • name (Constant)

    string values One of XS::IDENTITY, XS::SUBSCRIBE or XS::UNSUBSCRIBE

  • value

Returns:

  • 0 when the operation completed successfully

  • -1 when this operation fails



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/ffi-rxs/socket.rb', line 128

def setsockopt name, value, length = nil
  if 1 == @option_lookup[name]
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif 0 == @option_lookup[name]
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif 2 == @option_lookup[name]
    length ||= value.size

    # note: not checking errno for failed memory allocations :(
    pointer = LibC.malloc length
    pointer.write_string value
  end

  rc = LibXS.xs_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.nil? || pointer.null?
  rc
end

#shutdown(endpoint_id) ⇒ Object

Unbinds/disconnects the socket from an endpoint.

Examples:

socket.shutdown(endpoint_id)

Parameters:

Returns:

  • 0 if successful

  • -1 if unsuccessful



348
349
350
# File 'lib/ffi-rxs/socket.rb', line 348

def shutdown endpoint_id
  LibXS.xs_shutdown @socket, endpoint_id
end