Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/ffi-rzmq/socket.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Class Method Summary collapse
-
.create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object
Allocates a socket of type
type
for sending and receiving data.
Instance Method Summary collapse
-
#bind(address) ⇒ Object
Binds the socket to an
address
. -
#close ⇒ Object
Closes the socket.
-
#connect(address) ⇒ Object
Connects the socket to an
address
. -
#disconnect(endpoint) ⇒ Object
Disconnect the socket from the given
endpoint
. -
#getsockopt(name, array) ⇒ Object
Get the options set on this socket.
-
#identity ⇒ Object
Convenience method for getting the value of the socket IDENTITY.
-
#identity=(value) ⇒ Object
Convenience method for setting the value of the socket IDENTITY.
-
#initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Socket
constructor
To avoid rescuing exceptions, use the factory method #create for all socket creation.
-
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
-
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets.
-
#recv_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
-
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
-
#recvmsg(message, flags = 0) ⇒ Object
Dequeues a message from the underlying queue.
-
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
-
#send_and_close(message, flags = 0) ⇒ Object
Sends a message.
-
#send_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance out of the
string
passed in for transmission. -
#send_strings(parts, flags = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the
parts
passed in for transmission. -
#sendmsg(message, flags = 0) ⇒ Object
Queues the message for transmission.
-
#sendmsgs(parts, flags = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the
parts
passed in for transmission. -
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
-
#unbind(endpoint) ⇒ Object
Unbind the socket from the given
endpoint
.
Constructor Details
#initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Socket
To avoid rescuing exceptions, use the factory method #create for all socket creation.
Allocates a socket of type type
for sending and receiving data.
type
can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.
By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.
Creation of a new Socket object can raise an exception. This occurs when the context_ptr
is null or when the allocation of the 0mq socket within the context fails.
begin
socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
# error handling
end
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/ffi-rzmq/socket.rb', line 65 def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message} # users may override the classes used for receiving; class must conform to the # same public API as ZMQ::Message @receiver_klass = opts[:receiver_class] context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context) if context_ptr.nil? || context_ptr.null? raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null" else @socket = LibZMQ.zmq_socket context_ptr, type if @socket && !@socket.null? @name = SocketTypeNameMap[type] else raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null" end end @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
5 6 7 |
# File 'lib/ffi-rzmq/socket.rb', line 5 def name @name end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
5 6 7 |
# File 'lib/ffi-rzmq/socket.rb', line 5 def socket @socket end |
Class Method Details
.create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object
Allocates a socket of type type
for sending and receiving data.
type
can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.
By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.
Creation of a new Socket object can return nil when socket creation fails.
if (socket = Socket.new(context.pointer, ZMQ::REQ))
...
else
STDERR.puts "Socket creation failed"
end
32 33 34 |
# File 'lib/ffi-rzmq/socket.rb', line 32 def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message} new(context_ptr, type, opts) rescue nil end |
Instance Method Details
#bind(address) ⇒ Object
Binds the socket to an address
.
socket.bind("tcp://127.0.0.1:5555")
178 179 180 |
# File 'lib/ffi-rzmq/socket.rb', line 178 def bind address LibZMQ.zmq_bind @socket, address end |
#close ⇒ Object
Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.
Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ::Util.errno for the error code.
rc = socket.close
puts("Given socket was invalid!") unless 0 == rc
199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/ffi-rzmq/socket.rb', line 199 def close if @socket remove_finalizer rc = LibZMQ.zmq_close @socket @socket = nil release_cache rc else 0 end end |
#connect(address) ⇒ Object
Connects the socket to an address
.
rc = socket.connect("tcp://127.0.0.1:5555")
186 187 188 |
# File 'lib/ffi-rzmq/socket.rb', line 186 def connect address rc = LibZMQ.zmq_connect @socket, address end |
#disconnect(endpoint) ⇒ Object
Disconnect the socket from the given endpoint
.
490 491 492 |
# File 'lib/ffi-rzmq/socket.rb', line 490 def disconnect(endpoint) LibZMQ.zmq_disconnect(socket, endpoint) end |
#getsockopt(name, array) ⇒ Object
Get the options set on this socket.
name
determines the socket option to request array
should be an empty array; a result of the proper type (numeric, string, boolean) will be inserted into the first position.
Valid option_name
values:
ZMQ::RCVMORE - true or false
ZMQ::HWM - integer
ZMQ::SWAP - integer
ZMQ::AFFINITY - bitmap in an integer
ZMQ::IDENTITY - string
ZMQ::RATE - integer
ZMQ::RECOVERY_IVL - integer
ZMQ::SNDBUF - integer
ZMQ::RCVBUF - integer
ZMQ::FD - fd in an integer
ZMQ::EVENTS - bitmap integer
ZMQ::LINGER - integer measured in milliseconds
ZMQ::RECONNECT_IVL - integer measured in milliseconds
ZMQ::BACKLOG - integer
ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
ZMQ::IPV4ONLY - integer
Returns 0 when the operation completed successfully. Returns -1 when this operation failed.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
# retrieve high water mark
array = []
rc = socket.getsockopt(ZMQ::HWM, array)
hwm = array.first if ZMQ::Util.resultcode_ok?(rc)
463 464 465 466 467 468 469 470 471 472 |
# File 'lib/ffi-rzmq/socket.rb', line 463 def getsockopt name, array rc = __getsockopt__ name, array if Util.resultcode_ok?(rc) && (RCVMORE == name) # convert to boolean array[0] = 1 == array[0] end rc end |
#identity ⇒ Object
Convenience method for getting the value of the socket IDENTITY.
476 477 478 479 480 |
# File 'lib/ffi-rzmq/socket.rb', line 476 def identity array = [] getsockopt IDENTITY, array array.at(0) end |
#identity=(value) ⇒ Object
Convenience method for setting the value of the socket IDENTITY.
484 485 486 |
# File 'lib/ffi-rzmq/socket.rb', line 484 def identity=(value) setsockopt IDENTITY, value.to_s end |
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.
Warning: if the call to #getsockopt fails, this method will return false and swallow the error.
= []
= Message.new
rc = socket.recvmsg()
if ZMQ::Util.resultcode_ok?(rc)
<<
while more_parts?
= Message.new
rc = socket.recvmsg()
.push() if resulcode_ok?(rc)
end
end
168 169 170 171 172 |
# File 'lib/ffi-rzmq/socket.rb', line 168 def more_parts? rc = getsockopt ZMQ::RCVMORE, @more_parts_array Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false end |
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list
for receiving the message body parts and a routing_envelope
for receiving the message parts comprising the 0mq routing information.
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/ffi-rzmq/socket.rb', line 408 def recv_multipart list, routing_envelope, flag = 0 parts = [] rc = recvmsgs parts, flag if Util.resultcode_ok?(rc) routing = true parts.each do |part| if routing routing_envelope << part routing = part.size > 0 else list << part end end end rc end |
#recv_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
flags
may be ZMQ::DONTWAIT.
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
The application code is responsible for handling the message
object lifecycle when #recv returns an error code.
340 341 342 343 344 345 346 |
# File 'lib/ffi-rzmq/socket.rb', line 340 def recv_string string, flags = 0 = @receiver_klass.new rc = recvmsg , flags string.replace(.copy_out_string) if Util.resultcode_ok?(rc) .close rc end |
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
flag
may be ZMQ::DONTWAIT. Any other flag will be removed.
353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/ffi-rzmq/socket.rb', line 353 def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag if Util.resultcode_ok?(rc) array.each do || list << .copy_out_string .close end end rc end |
#recvmsg(message, flags = 0) ⇒ Object
Dequeues a message from the underlying queue. By default, this is a blocking operation.
flags
may take two values:
0 (default) - blocking operation
ZMQ::DONTWAIT - non-blocking operation
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
The application code is responsible for handling the message
object lifecycle when #recv returns an error code.
319 320 321 322 |
# File 'lib/ffi-rzmq/socket.rb', line 319 def recvmsg , flags = 0 #LibZMQ.zmq_recvmsg @socket, message.address, flags __recvmsg__(@socket, .address, flags) end |
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
flag
may be ZMQ::DONTWAIT. Any other flag will be removed.
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/ffi-rzmq/socket.rb', line 373 def recvmsgs list, flag = 0 flag = DONTWAIT if dontwait?(flag) = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << # check rc *first*; necessary because the call to #more_parts? can reset # the zmq_errno to a weird value, so the zmq_errno that was set on the # call to #recv gets lost while Util.resultcode_ok?(rc) && more_parts? = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << else .close list.each { |msg| msg.close } list.clear end end else .close end rc end |
#send_and_close(message, flags = 0) ⇒ Object
Sends a message. This will automatically close the message
for both successful and failed sends.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
296 297 298 299 300 |
# File 'lib/ffi-rzmq/socket.rb', line 296 def send_and_close , flags = 0 rc = sendmsg , flags .close rc end |
#send_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance out of the string
passed in for transmission.
flags
may be ZMQ::DONTWAIT and ZMQ::SNDMORE.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
244 245 246 247 |
# File 'lib/ffi-rzmq/socket.rb', line 244 def send_string string, flags = 0 = Message.new string send_and_close , flags end |
#send_strings(parts, flags = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts
passed in for transmission. Every element of parts
should be a String.
flags
may be ZMQ::DONTWAIT.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
263 264 265 |
# File 'lib/ffi-rzmq/socket.rb', line 263 def send_strings parts, flags = 0 send_multiple(parts, flags, :send_string) end |
#sendmsg(message, flags = 0) ⇒ Object
Queues the message for transmission. Message is assumed to conform to the same public API as #Message.
flags
may take two values:
-
0 (default) - blocking operation
-
ZMQ::DONTWAIT - non-blocking operation
-
ZMQ::SNDMORE - this message is part of a multi-part message
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
227 228 229 |
# File 'lib/ffi-rzmq/socket.rb', line 227 def sendmsg , flags = 0 __sendmsg__(@socket, .address, flags) end |
#sendmsgs(parts, flags = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts
passed in for transmission. Every element of parts
should be a Message (or subclass).
flags
may be ZMQ::DONTWAIT.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When
flags
is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
281 282 283 |
# File 'lib/ffi-rzmq/socket.rb', line 281 def sendmsgs parts, flags = 0 send_multiple(parts, flags, :sendmsg) end |
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
Valid name
values that take a numeric value
are:
ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3 only)
ZMQ::MAXMSGSIZE (version 3 only)
ZMQ::SNDHWM (version 3 only)
ZMQ::RCVHWM (version 3 only)
ZMQ::MULTICAST_HOPS (version 3 only)
ZMQ::RCVTIMEO (version 3 only)
ZMQ::SNDTIMEO (version 3 only)
Valid name
values that take a string value
are:
ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE
Returns 0 when the operation completed successfully. Returns -1 when this operation failed.
With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.
rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/ffi-rzmq/socket.rb', line 126 def setsockopt name, value, length = nil if 1 == @option_lookup[name] length = 8 pointer = LibC.malloc length pointer.write_long_long value elsif 0 == @option_lookup[name] length = 4 pointer = LibC.malloc length pointer.write_int value elsif 2 == @option_lookup[name] # Strings are treated as pointers by FFI so we'll just pass it through length ||= value.size pointer = value end rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length LibC.free(pointer) unless pointer.is_a?(String) || pointer.nil? || pointer.null? rc end |
#unbind(endpoint) ⇒ Object
Unbind the socket from the given endpoint
.
496 497 498 |
# File 'lib/ffi-rzmq/socket.rb', line 496 def unbind(endpoint) LibZMQ.zmq_unbind(socket, endpoint) end |