Class: EventMachine::ZeroMQ::Connection
- Inherits:
-
Connection
- Object
- Connection
- EventMachine::ZeroMQ::Connection
- Defined in:
- lib/em-zeromq/connection.rb
Instance Attribute Summary (collapse)
-
- (Object) address
readonly
Returns the value of attribute address.
-
- (Object) handler
Returns the value of attribute handler.
-
- (Object) on_readable
Returns the value of attribute on_readable.
-
- (Object) on_writable
Returns the value of attribute on_writable.
-
- (Object) socket
readonly
Returns the value of attribute socket.
-
- (Object) socket_type
readonly
Returns the value of attribute socket_type.
Class Method Summary (collapse)
Instance Method Summary (collapse)
-
- (Object) bind(address)
User method.
- - (Object) connect(address)
-
- (Connection) initialize(socket, socket_type, address, handler)
constructor
A new instance of Connection.
- - (Object) notify_readable
- - (Object) notify_writable
- - (Boolean) readable?
-
- (Object) register_readable
Make this socket available for reads.
-
- (Object) register_writable
Trigger on_readable when socket is readable.
-
- (Object) send_msg(*parts)
send a non blocking message parts: if only one argument is given a signle part message is sent.
- - (Object) setsockopt(opt, value)
- - (Object) subscribe(what = '')
-
- (Object) unbind
cleanup when ending loop.
- - (Object) unsubscribe(what)
- - (Boolean) writable?
Constructor Details
- (Connection) initialize(socket, socket_type, address, handler)
A new instance of Connection
7 8 9 10 11 12 |
# File 'lib/em-zeromq/connection.rb', line 7 def initialize(socket, socket_type, address, handler) @socket = socket @socket_type = socket_type @handler = handler @address = address end |
Instance Attribute Details
- (Object) address (readonly)
Returns the value of attribute address
5 6 7 |
# File 'lib/em-zeromq/connection.rb', line 5 def address @address end |
- (Object) handler
Returns the value of attribute handler
4 5 6 |
# File 'lib/em-zeromq/connection.rb', line 4 def handler @handler end |
- (Object) on_readable
Returns the value of attribute on_readable
4 5 6 |
# File 'lib/em-zeromq/connection.rb', line 4 def on_readable @on_readable end |
- (Object) on_writable
Returns the value of attribute on_writable
4 5 6 |
# File 'lib/em-zeromq/connection.rb', line 4 def on_writable @on_writable end |
- (Object) socket (readonly)
Returns the value of attribute socket
5 6 7 |
# File 'lib/em-zeromq/connection.rb', line 5 def socket @socket end |
- (Object) socket_type (readonly)
Returns the value of attribute socket_type
5 6 7 |
# File 'lib/em-zeromq/connection.rb', line 5 def socket_type @socket_type end |
Class Method Details
+ (Object) map_sockopt(opt, name)
14 15 16 17 |
# File 'lib/em-zeromq/connection.rb', line 14 def self.map_sockopt(opt, name) define_method(name){ @socket.getsockopt(opt) } define_method("#{name}="){|val| @socket.setsockopt(opt, val) } end |
Instance Method Details
- (Object) bind(address)
User method
32 33 34 |
# File 'lib/em-zeromq/connection.rb', line 32 def bind(address) @socket.bind(address) end |
- (Object) connect(address)
36 37 38 |
# File 'lib/em-zeromq/connection.rb', line 36 def connect(address) @socket.connect(address) end |
- (Object) notify_readable
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/em-zeromq/connection.rb', line 105 def notify_readable # Not sure if this is actually necessary. I suppose it prevents us # from having to to instantiate a ZMQ::Message unnecessarily. # I'm leaving this is because its in the docs, but it could probably # be taken out. return unless readable? loop do msg_parts = [] msg = if msg msg_parts << msg while @socket.more_parts? msg = if msg msg_parts << msg else raise "Multi-part message missing a message!" end end @handler.on_readable(self, msg_parts) else break end end end |
- (Object) notify_writable
133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/em-zeromq/connection.rb', line 133 def notify_writable return unless writable? # one a writable event is successfully received the socket # should be accepting messages again so stop triggering # write events self.notify_writable = false if @handler.respond_to?(:on_writable) @handler.on_writable(self) end end |
- (Boolean) readable?
145 146 147 |
# File 'lib/em-zeromq/connection.rb', line 145 def readable? (@socket.getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) == ZMQ::POLLIN end |
- (Object) register_readable
Make this socket available for reads
90 91 92 93 94 95 96 97 |
# File 'lib/em-zeromq/connection.rb', line 90 def register_readable # Since ZMQ is event triggered I think this is necessary if readable? notify_readable end # Subscribe to EM read notifications self.notify_readable = true end |
- (Object) register_writable
Trigger on_readable when socket is readable
100 101 102 103 |
# File 'lib/em-zeromq/connection.rb', line 100 def register_writable # Subscribe to EM write notifications self.notify_writable = true end |
- (Object) send_msg(*parts)
send a non blocking message parts: if only one argument is given a signle part message is sent
if more than one arguments is given a multipart message is sent
return: true is message was queued, false otherwise
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/em-zeromq/connection.rb', line 56 def send_msg(*parts) parts = Array(parts[0]) if parts.size == 0 sent = true # multipart parts[0...-1].each do |msg| sent = @socket.send_string(msg, ZMQ::NOBLOCK | ZMQ::SNDMORE) if sent == false break end end if sent # all the previous parts were queued, send # the last one @socket.send_string(parts[-1], ZMQ::NOBLOCK) else # error while sending the previous parts # register the socket for writability self.notify_writable = true false end end |
- (Object) setsockopt(opt, value)
80 81 82 |
# File 'lib/em-zeromq/connection.rb', line 80 def setsockopt(opt, value) @socket.setsockopt(opt, value) end |
- (Object) subscribe(what = '')
40 41 42 43 |
# File 'lib/em-zeromq/connection.rb', line 40 def subscribe(what = '') raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB' @socket.setsockopt(ZMQ::SUBSCRIBE, what) end |
- (Object) unbind
cleanup when ending loop
85 86 87 |
# File 'lib/em-zeromq/connection.rb', line 85 def unbind detach_and_close end |
- (Object) unsubscribe(what)
45 46 47 48 |
# File 'lib/em-zeromq/connection.rb', line 45 def unsubscribe(what) raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB' @socket.setsockopt(ZMQ::UNSUBSCRIBE, what) end |
- (Boolean) writable?
149 150 151 152 153 |
# File 'lib/em-zeromq/connection.rb', line 149 def writable? return true # ZMQ::EVENTS has issues in ZMQ HEAD, we'll ignore this till they're fixed # (@socket.getsockopt(ZMQ::EVENTS) & ZMQ::POLLOUT) == ZMQ::POLLOUT end |