Class: RbZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/rbzmq/socket.rb

Overview

RbZMQ::Socket

Constant Summary collapse

DEFAULT_TIMEOUT =

Default timeout.

5_000

Instance Method Summary collapse

Constructor Details

#initialize(type, opts = {}) ⇒ Socket

Allocates a socket of given type for sending and receiving data.

Parameters:

  • type (Integer)

    ZMQ socket type, on if ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

  • opts (Hash) (defaults to: {})

    Option hash. :ctx will be removed, all other options will be passed to ZMQ::Socket.new.

Options Hash (opts):

  • :ctx (Context)

    ZMQ context used to initialize socket. By default Context.global is used. Must be Context, ZMQ::Context or an FFI::Pointer.

Raises:


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rbzmq/socket.rb', line 40

def initialize(type, opts = {})
  ctx = opts.fetch(:ctx) { RbZMQ::Context.global }
  ctx = ctx.pointer if ctx.respond_to? :pointer

  unless ctx.is_a?(FFI::Pointer)
    raise ArgumentError.new "Context must be ZMQ::Context or " \
      "RbZMQ::Context (respond to #pointer) or must be a FFI::Pointer, "\
      "but #{ctx.inspect} given."
  end

  @zmq_ctx       = ctx
  @zmq_socket    = ZMQ::Socket.new ctx, type
rescue ZMQ::ZeroMQError => err
  raise ZMQError.new err
end

Instance Method Details

#bind(address) ⇒ RbZMQ::Socket

Bind this socket to given address.

Examples:

socket = RbZMQ::Socket.new ZMQ::PUB
socket.bind "tcp://127.0.0.1:5555"

Parameters:

  • address (String)

    Address to bind. Must be a supported protocol.

Returns:

Raises:


74
75
76
77
# File 'lib/rbzmq/socket.rb', line 74

def bind(address)
  ZMQError.error! zmq_socket.bind address
  self
end

#closeBoolean

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.

Examples:

socket = RbZMQ::Socket.new ZMQ::PULL
socket.close

Returns:

  • (Boolean)

    Return true upon success or when the socket has already been closed, false otherwise. Use #close! to raise an error on failure.


110
111
112
# File 'lib/rbzmq/socket.rb', line 110

def close
  ZMQError.ok? zmq_socket.close
end

#close!Boolean

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.

Examples:

socket = RbZMQ::Socket.new ZMQ::PULL
socket.close!

Returns:

  • (Boolean)

    True.

Raises:

  • (ZMQError)

    Error raised on failure.


125
126
127
128
# File 'lib/rbzmq/socket.rb', line 125

def close!
  ZMQError.error! zmq_socket.close
  true
end

#connect(address) ⇒ RbZMQ::Socket

Connect to given address.

Examples:

Bind to single remote address

socket = RbZMQ::Socket.new ZMQ::PUSH
socket.connect "tcp://127.0.0.1:5555"

Bind to multiple endpoints

socket = RbZMQ::Socket.new ZMQ::ROUTER
socket.connect "tcp://127.0.0.1:5555"
socket.connect "tcp://127.0.0.1:6666"

Returns:

Raises:


94
95
96
97
# File 'lib/rbzmq/socket.rb', line 94

def connect(address)
  ZMQError.error! zmq_socket.connect address
  self
end

#recv(flags = 0, opts = {}) ⇒ RbZMQ::Message

Dequeues a message from the underlying queue.

By default, this is a blocking operation.

Examples:

message = socket.recv

Parameters:

  • flags (Integer) (defaults to: 0)

    Can be ZMQ::DONTWAIT.

  • opts (Hash) (defaults to: {})

    Options.

Options Hash (opts):

  • :block (Boolean)

    If false operation will be non-blocking. Defaults to true.

  • :timeout (Integer)

    Raise a EAGAIN error if nothing was received within given amount of milliseconds. Defaults to DEFAULT_TIMEOUT. The values `:blocking`, `:infinity` or `-1` will wait forever.

Returns:

Raises:

  • (ZMQError)

    Raise error under two conditions.

    1. The message could not be dequeued

    2. When mode is non-blocking and the socket returned EAGAIN.

  • (Errno::EAGAIN)

    When timeout was reached without receiving a message.


218
219
220
221
222
223
224
225
226
227
# File 'lib/rbzmq/socket.rb', line 218

def recv(flags = 0, opts = {})
  opts, flags = flags, 0 if flags.is_a?(Hash)

  with_recv_timeout(opts) do
    rc = zmq_socket.recvmsg((message = ZMQ::Message.new),
                            convert_flags(opts, flags, [:block]))
    ZMQError.error! rc
    RbZMQ::Message.new(message)
  end
end

#send(messages, flags = 0, opts = {}) ⇒ RbZMQ::Socket

Queues one or more messages for transmission.

Examples:

Send single message or string

begin
  message = RbZMQ::Message.new
  socket.send message
rescue RbZMQ::ZMQError => err
  puts 'Send failed.'
end

Send multiple messages

socket.send ["A", "B", "C 2"]

Parameters:

  • messages (RbZMQ::Message, String, #each)

    A Message or string message to send, or a list of messages responding to `#each`.

  • flags (Integer) (defaults to: 0)

    May contains of the following flags:

    • 0 (default) - blocking operation

    • ZMQ::DONTWAIT - non-blocking operation

    • ZMQ::SNDMORE - this message or all messages are part of a multi-part message

  • opts (Hash) (defaults to: {})

    Options.

Options Hash (opts):

  • :block (Boolean)

    If method call should block. Will set ZMQ::DONTWAIT flag if false. Defaults to true.

  • :more (Boolean)

    If this message or all messages are part of a multi-part message

Returns:

Raises:

  • (ZMQError)

    Raises an error under two conditions:

    1. The message(s) could not be enqueued

    2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.


177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/rbzmq/socket.rb', line 177

def send(messages, flags = 0, opts = {})
  opts, flags = flags, 0 if flags.is_a?(Hash)
  flags       = convert_flags(opts, flags)

  if messages.respond_to?(:each)
    send_multiple(messages, flags)
  else
    send_single(messages, flags)
  end

  self
end

#setsockopt(opt, val) ⇒ Boolean

Set a ZMQ socket object.

Returns:

  • (Boolean)

    True if success, false otherwise.

See Also:

  • zmq_setsockopt

136
137
138
# File 'lib/rbzmq/socket.rb', line 136

def setsockopt(opt, val)
  ZMQError.ok? zmq_socket.setsockopt(opt, val)
end

#socketObject

Return ZMQ socket pointer. Required interface for ZMQ::Poller.


58
59
60
# File 'lib/rbzmq/socket.rb', line 58

def socket
  @zmq_socket.socket
end