Module: XS::CommonSocketBehavior
- Included in:
- Socket
- Defined in:
- lib/ffi-rxs/socket.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Class Method Summary collapse
-
.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
Instance Method Summary collapse
-
#bind(address) ⇒ Object
Binds the socket to an address.
-
#close ⇒ Object
Closes the socket.
-
#connect(address) ⇒ Object
Connects the socket to an address.
-
#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
-
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
-
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets.
-
#recv_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
-
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
-
#recvmsg(message, flag = 0) ⇒ Object
Dequeues a message from the underlying queue.
-
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
-
#send_and_close(message, flag = 0) ⇒ Object
Sends a message.
-
#send_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance out of the string passed in for transmission.
-
#send_strings(parts, flag = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts passed in for transmission.
-
#sendmsg(message, flag = 0) ⇒ Object
Queues the message for transmission.
-
#sendmsgs(parts, flag = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts passed in for transmission.
-
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
-
#shutdown(endpoint_id) ⇒ Object
Unbinds/disconnects the socket from an endpoint.
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/ffi-rxs/socket.rb', line 7 def name @name end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
7 8 9 |
# File 'lib/ffi-rxs/socket.rb', line 7 def socket @socket end |
Class Method Details
.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.
39 40 41 |
# File 'lib/ffi-rxs/socket.rb', line 39 def self.create context_ptr, type, opts = {:receiver_class => XS::Message} new(context_ptr, type, opts) rescue nil end |
Instance Method Details
#bind(address) ⇒ Object
Binds the socket to an address.
189 190 191 |
# File 'lib/ffi-rxs/socket.rb', line 189 def bind address LibXS.xs_bind @socket, address end |
#close ⇒ Object
Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option XS::LINGER.
215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/ffi-rxs/socket.rb', line 215 def close if @socket remove_finalizer rc = LibXS.xs_close @socket @socket = nil release_cache rc else 0 end end |
#connect(address) ⇒ Object
Connects the socket to an address.
202 203 204 |
# File 'lib/ffi-rxs/socket.rb', line 202 def connect address LibXS.xs_connect @socket, address end |
#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
To avoid rescuing exceptions, use the factory method #create for all socket creation.
By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.
Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the Crossroads socket within the context fails.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/ffi-rxs/socket.rb', line 79 def initialize context_ptr, type, opts = {:receiver_class => XS::Message} # users may override the classes used for receiving; class must conform to the # same public API as XS::Message @receiver_klass = opts[:receiver_class] context_ptr = context_ptr.pointer if context_ptr.kind_of?(XS::Context) unless context_ptr.null? @socket = LibXS.xs_socket context_ptr, type if @socket && !@socket.null? @name = SocketTypeNameMap[type] else raise ContextError.new 'xs_socket', 0, ETERM, "Socket pointer was null" end else raise ContextError.new 'xs_socket', 0, ETERM, "Context pointer was null" end @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer end |
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
Equivalent to calling Socket#getsockopt with XS::RCVMORE.
Warning: if the call to #getsockopt fails, this method will return false and swallow the error.
174 175 176 177 178 |
# File 'lib/ffi-rxs/socket.rb', line 174 def more_parts? rc = getsockopt XS::RCVMORE, @more_parts_array Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false end |
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/ffi-rxs/socket.rb', line 472 def recv_multipart list, routing_envelope, flag = 0 parts = [] rc = recvmsgs parts, flag if Util.resultcode_ok?(rc) routing = true parts.each do |part| if routing routing_envelope << part routing = part.size > 0 else list << part end end end rc end |
#recv_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
With a -1 return code, the user must check XS.errno to determine the cause.
The application code is responsible for handling the message object lifecycle when #recv returns an error code.
389 390 391 392 393 394 395 |
# File 'lib/ffi-rxs/socket.rb', line 389 def recv_string string, flag = 0 = @receiver_klass.new rc = recvmsg , flag string.replace(.copy_out_string) if Util.resultcode_ok?(rc) .close rc end |
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/ffi-rxs/socket.rb', line 406 def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag if Util.resultcode_ok?(rc) array.each do || list << .copy_out_string .close end end rc end |
#recvmsg(message, flag = 0) ⇒ Object
Dequeues a message from the underlying queue. By default, this is a blocking operation.
With a -1 return code, the user must check XS.errno to determine the cause.
The application code is responsible for handling the message object lifecycle when #recv returns an error code.
368 369 370 |
# File 'lib/ffi-rxs/socket.rb', line 368 def recvmsg , flag = 0 __recvmsg__(@socket, .address, flag) end |
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 |
# File 'lib/ffi-rxs/socket.rb', line 430 def recvmsgs list, flag = 0 flag = NonBlocking if dontwait?(flag) = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << # check rc *first*; necessary because the call to #more_parts? can reset # the xs_errno to a weird value, so the xs_errno that was set on the # call to #recv gets lost while Util.resultcode_ok?(rc) && more_parts? = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << else .close list.each { |msg| msg.close } list.clear end end else .close end rc end |
#send_and_close(message, flag = 0) ⇒ Object
Sends a message. This will automatically close the message for both successful and failed sends.
With a -1 return code, the user must check XS.errno to determine the cause.
333 334 335 336 337 |
# File 'lib/ffi-rxs/socket.rb', line 333 def send_and_close , flag = 0 rc = sendmsg , flag .close rc end |
#send_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance out of the string passed in for transmission.
With a -1 return code, the user must check XS.errno to determine the cause.
260 261 262 263 |
# File 'lib/ffi-rxs/socket.rb', line 260 def send_string string, flag = 0 = Message.new string send_and_close , flag end |
#send_strings(parts, flag = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.
With a -1 return code, the user must check XS.errno to determine the cause.
280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/ffi-rxs/socket.rb', line 280 def send_strings parts, flag = 0 return -1 if !parts || parts.empty? flag = NonBlocking if dontwait?(flag) parts[0..-2].each do |part| rc = send_string part, (flag | XS::SNDMORE) return rc unless Util.resultcode_ok?(rc) end send_string parts[-1], flag end |
#sendmsg(message, flag = 0) ⇒ Object
Queues the message for transmission. Message is assumed to conform to the same public API as #Message.
With a -1 return code, the user must check XS.errno to determine the cause.
242 243 244 |
# File 'lib/ffi-rxs/socket.rb', line 242 def sendmsg , flag = 0 __sendmsg__(@socket, .address, flag) end |
#sendmsgs(parts, flag = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).
With a -1 return code, the user must check XS.errno to determine the cause.
307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/ffi-rxs/socket.rb', line 307 def sendmsgs parts, flag = 0 return -1 if !parts || parts.empty? flag = NonBlocking if dontwait?(flag) parts[0..-2].each do |part| rc = sendmsg part, (flag | XS::SNDMORE) return rc unless Util.resultcode_ok?(rc) end sendmsg parts[-1], flag end |
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket
With a -1 return code, the user must check XS.errno to determine the cause.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/ffi-rxs/socket.rb', line 128 def setsockopt name, value, length = nil if 1 == @option_lookup[name] length = 8 pointer = LibC.malloc length pointer.write_long_long value elsif 0 == @option_lookup[name] length = 4 pointer = LibC.malloc length pointer.write_int value elsif 2 == @option_lookup[name] length ||= value.size # note: not checking errno for failed memory allocations :( pointer = LibC.malloc length pointer.write_string value end rc = LibXS.xs_setsockopt @socket, name, pointer, length LibC.free(pointer) unless pointer.nil? || pointer.null? rc end |
#shutdown(endpoint_id) ⇒ Object
Unbinds/disconnects the socket from an endpoint.
348 349 350 |
# File 'lib/ffi-rxs/socket.rb', line 348 def shutdown endpoint_id LibXS.xs_shutdown @socket, endpoint_id end |