Module: ZMQMachine::Socket::Base

Included in:
Pair, Pub, Pull, Push, Rep, Req, Sub, XRep, XReq
Defined in:
lib/zm/sockets/base.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#kindObject (readonly)

Returns the value of attribute kind.



42
43
44
# File 'lib/zm/sockets/base.rb', line 42

def kind
  @kind
end

#poll_optionsObject (readonly)

Returns the value of attribute poll_options.



43
44
45
# File 'lib/zm/sockets/base.rb', line 43

def poll_options
  @poll_options
end

#raw_socketObject (readonly)

Returns the value of attribute raw_socket.



42
43
44
# File 'lib/zm/sockets/base.rb', line 42

def raw_socket
  @raw_socket
end

Class Method Details

.create(context, handler) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/zm/sockets/base.rb', line 45

def self.create context, handler
  socket = nil
  begin
    socket = new context, handler
  rescue => e
    socket = nil
  end

  socket
end

Instance Method Details

#attach(handler) ⇒ Object

Call the handler’s #on_attach method and pass itself so the handler may complete its setup.

The #on_attach method is passed a single argument named socket. The method should probably #bind or #connect to an address and potentially schedule (via timer) an operation or begin sending messages immediately.

Raises:

  • (ArgumentError)


79
80
81
82
# File 'lib/zm/sockets/base.rb', line 79

def attach handler
  raise ArgumentError, "Handler must provide an 'on_attach' method" unless handler.respond_to? :on_attach
  handler.on_attach self
end

#bind(address) ⇒ Object

Creates a 0mq socket endpoint for the transport given in the address. Other 0mq sockets may then #connect to this bound endpoint.



88
89
90
91
92
93
# File 'lib/zm/sockets/base.rb', line 88

def bind address
  @bindings << address
  rc = @raw_socket.bind address.to_s
  @bindings.pop unless ZMQ::Util.resultcode_ok?(rc)
  rc
end

#connect(address) ⇒ Object

Connect this 0mq socket to the 0mq socket bound to the endpoint described by the address.



98
99
100
101
102
103
# File 'lib/zm/sockets/base.rb', line 98

def connect address
  @connections << address
  rc = @raw_socket.connect address.to_s
  @connections.pop unless ZMQ::Util.resultcode_ok?(rc)
  rc
end

#identityObject

Retrieve the IDENTITY value assigned to this socket.



136
# File 'lib/zm/sockets/base.rb', line 136

def identity() @raw_socket.identity; end

#identity=(value) ⇒ Object

Assign a custom IDENTITY value to this socket. Limit is 255 bytes and must be greater than 0 bytes.



141
# File 'lib/zm/sockets/base.rb', line 141

def identity=(value) @raw_socket.identity = value; end

#initialize(context, handler) ⇒ Object

Raises:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/zm/sockets/base.rb', line 56

def initialize context, handler
  @context = context
  @bindings = []
  @connections = []

  @handler = handler
  @raw_socket = allocate_socket @context

  # default ZMQ::LINGER to 1 millisecond so closing a socket
  # doesn't block forever
  rc = @raw_socket.setsockopt ZMQ::LINGER, 1
  raise SetsockoptError.new("Setting LINGER on socket failed at line #{caller(0)}") unless ZMQ::Util.resultcode_ok?(rc)
  attach @handler
end

#inspectObject



174
175
176
# File 'lib/zm/sockets/base.rb', line 174

def inspect
  "kind [#{@kind}] poll options [#{@poll_options}]"
end

#resume_readObject

Used by the reactor. Never called by user code.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/zm/sockets/base.rb', line 147

def resume_read
  rc = 0
  more = true

  while ZMQ::Util.resultcode_ok?(rc) && more
    parts = []
    rc = @raw_socket.recvmsgs parts, ZMQ::Util.nonblocking_flag

    if ZMQ::Util.resultcode_ok?(rc)
      @handler.on_readable self, parts
    else
      # verify errno corresponds to EAGAIN
      if eagain?
        more = false
      elsif valid_socket_error?
        @handler.on_readable_error self, rc
      end
    end
  end
end

#resume_writeObject

Used by the reactor. Never called by user code.



170
171
172
# File 'lib/zm/sockets/base.rb', line 170

def resume_write
  @handler.on_writable self
end

#send_message(message, multipart = false) ⇒ Object

Called to send a ZMQ::Message that was populated with data.

Returns 0+ on success, -1 for failure. Use ZMQ::Util.resultcode_ok? and ZMQ::Util.errno to check for errors.



110
111
112
113
# File 'lib/zm/sockets/base.rb', line 110

def send_message message, multipart = false
  flag = multipart ? (ZMQ::SNDMORE | ZMQ::Util.nonblocking_flag) : ZMQ::Util.nonblocking_flag
  @raw_socket.send(message, flag)
end

#send_message_string(message, multipart = false) ⇒ Object

Convenience method to send a string on the socket. It handles the creation of a ZMQ::Message and populates it appropriately.

Returns 0+ for success, -1 for failure. Check ZMQ::Util.errno for details on the error.



121
122
123
# File 'lib/zm/sockets/base.rb', line 121

def send_message_string message, multipart = false
  @raw_socket.send_string message, ZMQ::Util.nonblocking_flag | (multipart ? ZMQ::SNDMORE : 0)
end

#send_messages(messages) ⇒ Object

Convenience method for sending a multi-part message. The messages argument must implement Enumerable.



128
129
130
# File 'lib/zm/sockets/base.rb', line 128

def send_messages messages
  @raw_socket.sendmsgs messages
end