Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/0mq/socket.rb,
lib/0mq/socket/options.rb
Overview
See api.zeromq.org/4-0:zmq-socket Not thread safe.
Constant Summary collapse
- @@get_options =
{ :RCVMORE => :bool, :RCVHWM => :int, :AFFINITY => :uint64, :IDENTITY => :string, :RATE => :int, :RECOVERY_IVL => :int, :SNDBUF => :int, :RCVBUF => :int, :LINGER => :int, :RECONNECT_IVL => :int, :RECONNECT_IVL_MAX => :int, :BACKLOG => :int, :MAXMSGSIZE => :int64, :MULTICAST_HOPS => :int, :RCVTIMEO => :int, :SNDTIMEO => :int, :IPV6 => :bool, :IPV4ONLY => :bool, :IMMEDIATE => :bool, :FD => :int, :EVENTS => :int, :LAST_ENDPOINT => :string, :TCP_KEEPALIVE => :int, :TCP_KEEPALIVE_IDLE => :int, :TCP_KEEPALIVE_CNT => :int, :TCP_KEEPALIVE_INTVL => :int, :MECHANISM => :int, :PLAIN_SERVER => :int, :PLAIN_USERNAME => :string, :PLAIN_PASSWORD => :string, :CURVE_PUBLICKEY => :string, :CURVE_SECRETKEY => :string, :CURVE_SERVERKEY => :string, :ZAP_DOMAIN => :string, }
- @@set_options =
{ :SNDHWM => :int, :RCVHWM => :int, :AFFINITY => :uint64, :SUBSCRIBE => :string, :UNSUBSCRIBE => :string, :IDENTITY => :string, :RATE => :int, :RECOVERY_IVL => :int, :SNDBUF => :int, :RCVBUF => :int, :LINGER => :int, :RECONNECT_IVL => :int, :RECONNECT_IVL_MAX => :int, :RECONNECT_IVL => :int, :BACKLOG => :int, :MAXMSGSIZE => :int64, :MULTICAST_HOPS => :int, :RCVTIMEO => :int, :SNDTIMEO => :int, :IPV6 => :bool, :IPV4ONLY => :bool, :IMMEDIATE => :bool, :ROUTER_HANDOVER => :int, :ROUTER_MANDATORY => :int, :ROUTER_RAW => :int, :PROBE_ROUTER => :int, :XPUB_VERBOSE => :int, :REQ_CORRELATE => :int, :REQ_RELAXED => :int, :TCP_KEEPALIVE => :int, :TCP_KEEPALIVE_IDLE => :int, :TCP_KEEPALIVE_CNT => :int, :TCP_KEEPALIVE_INTVL => :int, :TCP_ACCEPT_FILTER => :string, :PLAIN_SERVER => :int, :PLAIN_USERNAME => :string, :PLAIN_PASSWORD => :string, :CURVE_SERVER => :int, :CURVE_PUBLICKEY => :string, :CURVE_SECRETKEY => :string, :CURVE_SERVERKEY => :string, :ZAP_DOMAIN => :string, :CONFLATE => :bool, }
- @@option_types =
Set up map of option codes to option types
{}
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
The socket’s ZMQ::Context.
-
#pointer ⇒ Object
readonly
The FFI pointer to the socket.
-
#type ⇒ Object
readonly
The socket’s ZeroMQ socket type (e.g. ZMQ::ROUTER).
Class Method Summary collapse
-
.finalizer(pointer, context, pid) ⇒ Object
Create a safe finalizer for the socket pointer to close on GC.
Instance Method Summary collapse
-
#bind(endpoint) ⇒ Object
Bind to an endpoint.
-
#close ⇒ Object
Close the socket.
-
#closed? ⇒ Boolean
Returns true if the socket is closed.
-
#connect(endpoint) ⇒ Object
Connect to an endpoint.
-
#disconnect(endpoint) ⇒ Object
Disconnect from an endpoint.
-
#get_opt(option) ⇒ Object
Get a socket option.
-
#initialize(type, opts = {}) ⇒ Socket
constructor
A new instance of Socket.
-
#inspect ⇒ Object
Show a useful inspect output.
-
#recv_array(flags = 0) ⇒ Object
Receive a multipart message as an array of strings.
-
#recv_string(flags = 0) ⇒ Object
Receive a string from the socket.
-
#recv_with_routing ⇒ Object
Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts.
-
#send_array(array, flags = 0) ⇒ Object
Send a multipart message as an array of strings.
-
#send_string(string, flags = 0) ⇒ Object
Send a string to the socket.
-
#send_with_routing(routing, body) ⇒ Object
Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts.
-
#set_opt(option, value) ⇒ Object
Set a socket option.
-
#to_ptr ⇒ Object
Returns the socket’s FFI pointer.
-
#type_sym ⇒ Object
Get the socket type name as a symbol.
-
#unbind(endpoint) ⇒ Object
Unbind from an endpoint.
Constructor Details
#initialize(type, opts = {}) ⇒ Socket
Returns a new instance of Socket.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/0mq/socket.rb', line 17 def initialize(type, opts={}) @closed = false @context = opts.fetch :context, ZMQ::DefaultContext @type = type @pointer = LibZMQ.zmq_socket @context.pointer, @type ZMQ.error_check true if @pointer.null? @msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false @context.send :register_socket_pointer, @pointer ObjectSpace.define_finalizer self, self.class.finalizer(@pointer, @context, Process.pid) end |
Instance Attribute Details
#context ⇒ Object (readonly)
The socket’s ZMQ::Context.
13 14 15 |
# File 'lib/0mq/socket.rb', line 13 def context @context end |
#pointer ⇒ Object (readonly)
The FFI pointer to the socket.
11 12 13 |
# File 'lib/0mq/socket.rb', line 11 def pointer @pointer end |
#type ⇒ Object (readonly)
The socket’s ZeroMQ socket type (e.g. ZMQ::ROUTER).
15 16 17 |
# File 'lib/0mq/socket.rb', line 15 def type @type end |
Class Method Details
.finalizer(pointer, context, pid) ⇒ Object
Create a safe finalizer for the socket pointer to close on GC
61 62 63 64 65 66 67 68 |
# File 'lib/0mq/socket.rb', line 61 def self.finalizer(pointer, context, pid) Proc.new do if Process.pid == pid context.send :unregister_socket_pointer, pointer LibZMQ.zmq_close pointer end end end |
Instance Method Details
#bind(endpoint) ⇒ Object
Bind to an endpoint
76 77 78 79 |
# File 'lib/0mq/socket.rb', line 76 def bind(endpoint) rc = LibZMQ.zmq_bind @pointer, endpoint ZMQ.error_check true if rc==-1 end |
#close ⇒ Object
Close the socket
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/0mq/socket.rb', line 38 def close @closed = true if @pointer @temp_buffers.clear if @temp_buffers ObjectSpace.undefine_finalizer self @context.send :unregister_socket_pointer, @pointer rc = LibZMQ.zmq_close @pointer ZMQ.error_check true if rc==-1 @pointer = nil @context = nil end end |
#closed? ⇒ Boolean
Returns true if the socket is closed.
56 57 58 |
# File 'lib/0mq/socket.rb', line 56 def closed? @closed end |
#connect(endpoint) ⇒ Object
Connect to an endpoint
82 83 84 85 |
# File 'lib/0mq/socket.rb', line 82 def connect(endpoint) rc = LibZMQ.zmq_connect @pointer, endpoint ZMQ.error_check true if rc==-1 end |
#disconnect(endpoint) ⇒ Object
Disconnect from an endpoint
94 95 96 97 |
# File 'lib/0mq/socket.rb', line 94 def disconnect(endpoint) rc = LibZMQ.zmq_disconnect @pointer, endpoint ZMQ.error_check true if rc==-1 end |
#get_opt(option) ⇒ Object
Get a socket option
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/0mq/socket.rb', line 202 def get_opt(option) type = @@option_types.fetch(option) \ { raise ArgumentError, "Unknown option: #{option}" } value, size = get_opt_pointers type rc = LibZMQ.zmq_getsockopt @pointer, option, value, size ZMQ.error_check true if rc==-1 if type == :string value.read_string(size.read_int-1) elsif type == :bool value.read_int == 1 else value.send :"read_#{type}" end end |
#inspect ⇒ Object
Show a useful inspect output
33 34 35 |
# File 'lib/0mq/socket.rb', line 33 def inspect "#<#{self.class}:#{type_sym}:#{object_id.to_s(16)}>" end |
#recv_array(flags = 0) ⇒ Object
Receive a multipart message as an array of strings
141 142 143 144 145 146 147 148 |
# File 'lib/0mq/socket.rb', line 141 def recv_array(flags = 0) [].tap do |ary| loop do ary << recv_string(flags) break unless get_opt(ZMQ::RCVMORE) end end end |
#recv_string(flags = 0) ⇒ Object
Receive a string from the socket
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/0mq/socket.rb', line 117 def recv_string(flags = 0) rc = LibZMQ.zmq_msg_init @msgptr ZMQ.error_check true if rc==-1 rc = LibZMQ.zmq_recvmsg @pointer, @msgptr, flags ZMQ.error_check true if rc==-1 str = LibZMQ.zmq_msg_data(@msgptr) .read_string(LibZMQ.zmq_msg_size(@msgptr)) rc = LibZMQ.zmq_msg_close @msgptr ZMQ.error_check true if rc==-1 str end |
#recv_with_routing ⇒ Object
Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part is not included in the resulting arrays.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/0mq/socket.rb', line 162 def recv_with_routing [[],[]].tap do |routing, body| loop do nxt = recv_string break if nxt.empty? routing << nxt raise ArgumentError, "Expected empty routing delimiter in "\ "multipart message: #{routing}" \ unless get_opt ZMQ::RCVMORE end loop do body << recv_string break unless get_opt(ZMQ::RCVMORE) end end end |
#send_array(array, flags = 0) ⇒ Object
Send a multipart message as an array of strings
134 135 136 137 138 |
# File 'lib/0mq/socket.rb', line 134 def send_array(array, flags = 0) array = array.to_a array[0...-1].each { |str| send_string str, ZMQ::SNDMORE|flags } send_string array.last, flags end |
#send_string(string, flags = 0) ⇒ Object
Send a string to the socket
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/0mq/socket.rb', line 100 def send_string(string, flags = 0) string = string.to_s size = string.respond_to?(:bytesize) ? string.bytesize : string.size @msgbuf = LibC.malloc size @msgbuf.write_string string, size rc = LibZMQ.zmq_msg_init_data @msgptr, @msgbuf, size, LibC::Free, nil ZMQ.error_check true if rc==-1 rc = LibZMQ.zmq_sendmsg @pointer, @msgptr, flags ZMQ.error_check true if rc==-1 rc = LibZMQ.zmq_msg_close @msgptr ZMQ.error_check true if rc==-1 end |
#send_with_routing(routing, body) ⇒ Object
Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part should not be included in the input arrays.
154 155 156 |
# File 'lib/0mq/socket.rb', line 154 def send_with_routing(routing, body) send_array [*routing, '', *body] end |
#set_opt(option, value) ⇒ Object
Set a socket option
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/0mq/socket.rb', line 180 def set_opt(option, value) type = @@option_types.fetch(option) \ { raise ArgumentError, "Unknown option: #{option}" } unless type == :string if type == :bool valptr = FFI::MemoryPointer.new(:int) valptr.write_int(value ? 1 : 0) else valptr = FFI::MemoryPointer.new(type) valptr.send :"write_#{type}", value end value = valptr end rc = LibZMQ.zmq_setsockopt @pointer, option, value, value.size ZMQ.error_check true if rc==-1 value end |
#to_ptr ⇒ Object
Returns the socket’s FFI pointer.
221 222 223 |
# File 'lib/0mq/socket.rb', line 221 def to_ptr @pointer end |
#type_sym ⇒ Object
Get the socket type name as a symbol
71 72 73 |
# File 'lib/0mq/socket.rb', line 71 def type_sym ZMQ::SocketTypeNameMap[type].to_sym end |
#unbind(endpoint) ⇒ Object
Unbind from an endpoint
88 89 90 91 |
# File 'lib/0mq/socket.rb', line 88 def unbind(endpoint) rc = LibZMQ.zmq_unbind @pointer, endpoint ZMQ.error_check true if rc==-1 end |