Class: EventMachine::ZeroMQ::Connection

Inherits:
Connection
  • Object
show all
Defined in:
lib/em-zeromq/connection.rb

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Connection) initialize(socket, socket_type, address, handler)

A new instance of Connection



7
8
9
10
11
12
# File 'lib/em-zeromq/connection.rb', line 7

def initialize(socket, socket_type, address, handler)
  @socket      = socket
  @socket_type = socket_type
  @handler     = handler
  @address     = address
end

Instance Attribute Details

- (Object) address (readonly)

Returns the value of attribute address



5
6
7
# File 'lib/em-zeromq/connection.rb', line 5

def address
  @address
end

- (Object) handler

Returns the value of attribute handler



4
5
6
# File 'lib/em-zeromq/connection.rb', line 4

def handler
  @handler
end

- (Object) on_readable

Returns the value of attribute on_readable



4
5
6
# File 'lib/em-zeromq/connection.rb', line 4

def on_readable
  @on_readable
end

- (Object) on_writable

Returns the value of attribute on_writable



4
5
6
# File 'lib/em-zeromq/connection.rb', line 4

def on_writable
  @on_writable
end

- (Object) socket (readonly)

Returns the value of attribute socket



5
6
7
# File 'lib/em-zeromq/connection.rb', line 5

def socket
  @socket
end

- (Object) socket_type (readonly)

Returns the value of attribute socket_type



5
6
7
# File 'lib/em-zeromq/connection.rb', line 5

def socket_type
  @socket_type
end

Class Method Details

+ (Object) map_sockopt(opt, name)



14
15
16
17
# File 'lib/em-zeromq/connection.rb', line 14

def self.map_sockopt(opt, name)
  define_method(name){ @socket.getsockopt(opt) }
  define_method("#{name}="){|val| @socket.setsockopt(opt, val) }
end

Instance Method Details

- (Object) bind(address)

User method



32
33
34
# File 'lib/em-zeromq/connection.rb', line 32

def bind(address)
  @socket.bind(address)
end

- (Object) connect(address)



36
37
38
# File 'lib/em-zeromq/connection.rb', line 36

def connect(address)
  @socket.connect(address)
end

- (Object) notify_readable



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/em-zeromq/connection.rb', line 105

def notify_readable
  # Not sure if this is actually necessary. I suppose it prevents us
  # from having to to instantiate a ZMQ::Message unnecessarily.
  # I'm leaving this is because its in the docs, but it could probably
  # be taken out.
  return unless readable?
   
  loop do
    msg_parts = []
    msg       = get_message
    if msg
      msg_parts << msg
      while @socket.more_parts?
        msg = get_message
        if msg
          msg_parts << msg
        else
          raise "Multi-part message missing a message!"
        end
      end
      
      @handler.on_readable(self, msg_parts)
    else
      break
    end
  end
end

- (Object) notify_writable



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/em-zeromq/connection.rb', line 133

def notify_writable
  return unless writable?
  
  # one a writable event is successfully received the socket
  # should be accepting messages again so stop triggering
  # write events
  self.notify_writable = false
  
  if @handler.respond_to?(:on_writable)
    @handler.on_writable(self)
  end
end

- (Boolean) readable?

Returns:

  • (Boolean)


145
146
147
# File 'lib/em-zeromq/connection.rb', line 145

def readable?
  (@socket.getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) == ZMQ::POLLIN
end

- (Object) register_readable

Make this socket available for reads



90
91
92
93
94
95
96
97
# File 'lib/em-zeromq/connection.rb', line 90

def register_readable
  # Since ZMQ is event triggered I think this is necessary
  if readable?
    notify_readable
  end
  # Subscribe to EM read notifications
  self.notify_readable = true
end

- (Object) register_writable

Trigger on_readable when socket is readable



100
101
102
103
# File 'lib/em-zeromq/connection.rb', line 100

def register_writable
  # Subscribe to EM write notifications
  self.notify_writable = true
end

- (Object) send_msg(*parts)

send a non blocking message parts: if only one argument is given a signle part message is sent

if more than one arguments is given a multipart message is sent

return: true is message was queued, false otherwise



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/em-zeromq/connection.rb', line 56

def send_msg(*parts)
  parts = Array(parts[0]) if parts.size == 0
  sent = true
  
  # multipart
  parts[0...-1].each do |msg|
    sent = @socket.send_string(msg, ZMQ::NOBLOCK | ZMQ::SNDMORE)
    if sent == false
      break
    end
  end
  
  if sent
    # all the previous parts were queued, send
    # the last one
    @socket.send_string(parts[-1], ZMQ::NOBLOCK)
  else
    # error while sending the previous parts
    # register the socket for writability
    self.notify_writable = true
    false
  end
end

- (Object) setsockopt(opt, value)



80
81
82
# File 'lib/em-zeromq/connection.rb', line 80

def setsockopt(opt, value)
  @socket.setsockopt(opt, value)
end

- (Object) subscribe(what = '')



40
41
42
43
# File 'lib/em-zeromq/connection.rb', line 40

def subscribe(what = '')
  raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
  @socket.setsockopt(ZMQ::SUBSCRIBE, what)
end

- (Object) unbind

cleanup when ending loop



85
86
87
# File 'lib/em-zeromq/connection.rb', line 85

def unbind
  detach_and_close
end

- (Object) unsubscribe(what)



45
46
47
48
# File 'lib/em-zeromq/connection.rb', line 45

def unsubscribe(what)
  raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
  @socket.setsockopt(ZMQ::UNSUBSCRIBE, what)
end

- (Boolean) writable?

Returns:

  • (Boolean)


149
150
151
152
153
# File 'lib/em-zeromq/connection.rb', line 149

def writable?
  return true
  # ZMQ::EVENTS has issues in ZMQ HEAD, we'll ignore this till they're fixed
  # (@socket.getsockopt(ZMQ::EVENTS) & ZMQ::POLLOUT) == ZMQ::POLLOUT
end