Module: ZMQMachine::Socket::Base

Included in:
Pair, Pub, Rep, Req, Sub, XRep, XReq
Defined in:
lib/zm/sockets/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#kindObject (readonly)

Returns the value of attribute kind.



42
43
44
# File 'lib/zm/sockets/base.rb', line 42

def kind
  @kind
end

#poll_optionsObject (readonly)

Returns the value of attribute poll_options.



43
44
45
# File 'lib/zm/sockets/base.rb', line 43

def poll_options
  @poll_options
end

#raw_socketObject (readonly)

Returns the value of attribute raw_socket.



42
43
44
# File 'lib/zm/sockets/base.rb', line 42

def raw_socket
  @raw_socket
end

Instance Method Details

#attach(handler) ⇒ Object

Call the handler’s #on_attach method and pass itself so the handler may complete its setup.

The #on_attach method is passed a single argument named socket. The method should probably #bind or #connect to an address and potentially schedule (via timer) an operation or begin sending messages immediately.

Raises:

  • (ArgumentError)


64
65
66
67
# File 'lib/zm/sockets/base.rb', line 64

def attach handler
  raise ArgumentError, "Handler must provide an 'on_attach' method" unless handler.respond_to? :on_attach
  handler.on_attach self
end

#bind(address) ⇒ Object

Creates a 0mq socket endpoint for the transport given in the address. Other 0mq sockets may then #connect to this bound endpoint.



73
74
75
76
77
78
79
80
81
82
# File 'lib/zm/sockets/base.rb', line 73

def bind address
  begin
    @bindings << address
    @raw_socket.bind address.to_s
    true
  rescue ZMQ::ZeroMQError
    @bindings.pop
    false
  end
end

#connect(address) ⇒ Object

Connect this 0mq socket to the 0mq socket bound to the endpoint described by the address.



87
88
89
90
91
92
93
94
95
96
# File 'lib/zm/sockets/base.rb', line 87

def connect address
  begin
    @connections << address
    @raw_socket.connect address.to_s
    true
  rescue ZMQ::ZeroMQError
    @connections.pop
    false
  end
end

#identityObject

Retrieve the IDENTITY value assigned to this socket.



149
# File 'lib/zm/sockets/base.rb', line 149

def identity() @raw_socket.identity; end

#identity=(value) ⇒ Object

Assign a custom IDENTITY value to this socket. Limit is 255 bytes and must be greater than 0 bytes.



154
# File 'lib/zm/sockets/base.rb', line 154

def identity=(value) @raw_socket.identity = value; end

#initialize(context, handler) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/zm/sockets/base.rb', line 45

def initialize context, handler
  @state = :init
  @context = context
  @bindings = []
  @connections = []

  @handler = handler
  @raw_socket = allocate_socket @context
  attach @handler
end

#inspectObject



192
193
194
# File 'lib/zm/sockets/base.rb', line 192

def inspect
  "kind [#{@kind}] poll options [#{@poll_options}] state [#{@state}]"
end

#resume_readObject

Used by the reactor. Never called by user code.

FIXME: need to rework all of this rc stuff. The underlying lib returns nil when a NOBLOCK socket gets EAGAIN. It returns true when a message was successfully dequeued. The use of rc here is really ugly and wrong.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/zm/sockets/base.rb', line 162

def resume_read
  rc = 0
  
  # loop and deliver all messages until the socket returns EAGAIN
  while 0 == rc
    messages = []
    rc = read_message_part messages
    #puts "resume_read: rc1 [#{rc}], more_parts? [#{@raw_socket.more_parts?}]"

    while 0 == rc && @raw_socket.more_parts?
      #puts "get next part"
      rc = read_message_part messages
      #puts "resume_read: rc2 [#{rc}]"
    end
    #puts "no more parts, ready to deliver"

    # only deliver the messages when rc is 0; otherwise, we
    # may have gotten EAGAIN and no message was read;
    # don't deliver empty messages
    deliver messages, rc if 0 == rc
  end
end

#resume_writeObject

Used by the reactor. Never called by user code.



187
188
189
190
# File 'lib/zm/sockets/base.rb', line 187

def resume_write
  @state = :ready
  @handler.on_writable self
end

#send_message(message, multipart = false) ⇒ Object

Called to send a ZMQ::Message that was populated with data.

Returns true on success, false otherwise.

May raise a ZMQ::SocketError.



104
105
106
107
# File 'lib/zm/sockets/base.rb', line 104

def send_message message, multipart = false
  queued = @raw_socket.send message, ZMQ::NOBLOCK | (multipart ? ZMQ::SNDMORE : 0)
  queued
end

#send_message_string(message, multipart = false) ⇒ Object

Convenience method to send a string on the socket. It handles the creation of a ZMQ::Message and populates it appropriately.

Returns true on success, false otherwise.

May raise a ZMQ::SocketError.



116
117
118
119
# File 'lib/zm/sockets/base.rb', line 116

def send_message_string message, multipart = false
  queued = @raw_socket.send_string message, ZMQ::NOBLOCK | (multipart ? ZMQ::SNDMORE : 0)
  queued
end

#send_messages(messages) ⇒ Object

Convenience method for sending a multi-part message. The messages argument must respond to :size, :at and :last (like an Array).

May raise a ZMQ::SocketError.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/zm/sockets/base.rb', line 127

def send_messages messages
  rc = false
  i = 0
  size = messages.size

  # loop through all messages but the last
  while size > 1 && i < size - 1 do
    rc = send_message messages.at(i), true
    i += 1
  end
  
  # FIXME: bug; if any of the message parts fail (rc != 0) we don't see that here; the
  # #send_message function should capture exceptions and turn them into integers for bubbling

  # send the last message without the multipart arg to flush
  # the message to the 0mq queue
  rc = send_message messages.last if size > 0
  rc
end