Class: ASIR::Transport::Zmq
- Inherits:
-
ConnectionOriented
- Object
- ConnectionOriented
- ASIR::Transport::Zmq
- Defined in:
- lib/asir/transport/zmq.rb
Overview
!SLIDE ZeroMQ Transport
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
-
#_client_connect! ⇒ Object
!SLIDE 0MQ client.
- #_read(stream, state) ⇒ Object
- #_receive_result(state) ⇒ Object
- #_send_result(state) ⇒ Object
-
#_server! ⇒ Object
!SLIDE 0MQ server.
-
#_server_accept_connection!(server) ⇒ Object
server represents a receiving ZMQ endpoint.
-
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for ZMQ.
- #_write(payload, stream, state) ⇒ Object
- #queue_ ⇒ Object
- #stream_eof?(stream) ⇒ Boolean
- #zmq_context ⇒ Object
- #zmq_uri ⇒ Object
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
10 11 12 |
# File 'lib/asir/transport/zmq.rb', line 10 def queue @queue end |
Instance Method Details
#_client_connect! ⇒ Object
!SLIDE 0MQ client.
14 15 16 17 18 19 20 |
# File 'lib/asir/transport/zmq.rb', line 14 def _client_connect! sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ) sock.connect(zmq_uri) sock rescue ::Exception => exc raise exc.class, "#{self.class} #{zmq_uri}: #{exc.}", exc.backtrace end |
#_read(stream, state) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/asir/transport/zmq.rb', line 53 def _read stream, state if data = stream.recv(0) and one_way q, data = data.split(/ /, 2) end data end |
#_receive_result(state) ⇒ Object
33 34 35 36 |
# File 'lib/asir/transport/zmq.rb', line 33 def _receive_result state return nil if one_way || state..one_way super end |
#_send_result(state) ⇒ Object
38 39 40 41 |
# File 'lib/asir/transport/zmq.rb', line 38 def _send_result state return nil if one_way || state..one_way super end |
#_server! ⇒ Object
!SLIDE 0MQ server.
24 25 26 27 28 29 30 31 |
# File 'lib/asir/transport/zmq.rb', line 24 def _server! sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP) sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work? @server = sock rescue ::Exception => exc raise exc.class, "#{self.class} #{zmq_uri}: #{exc.}", exc.backtrace end |
#_server_accept_connection!(server) ⇒ Object
server represents a receiving ZMQ endpoint.
79 80 81 |
# File 'lib/asir/transport/zmq.rb', line 79 def _server_accept_connection! server [ server, @one_way ? nil : server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for ZMQ.
91 92 93 |
# File 'lib/asir/transport/zmq.rb', line 91 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_write(payload, stream, state) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/asir/transport/zmq.rb', line 43 def _write payload, stream, state if one_way q = state && state. q &&= q[:zmq_queue] || q[:queue] payload.insert(0, q ? "#{q} " : queue_) end stream.send payload, 0 stream end |
#queue_ ⇒ Object
73 74 75 76 |
# File 'lib/asir/transport/zmq.rb', line 73 def queue_ @queue_ ||= (queue.empty? ? "asir ": queue + " ").freeze end |
#stream_eof?(stream) ⇒ Boolean
86 87 88 |
# File 'lib/asir/transport/zmq.rb', line 86 def stream_eof? stream false end |
#zmq_context ⇒ Object
104 105 106 107 |
# File 'lib/asir/transport/zmq.rb', line 104 def zmq_context @@zmq_context ||= ZMQ::Context.new(1) end |
#zmq_uri ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/asir/transport/zmq.rb', line 95 def zmq_uri @zmq_uri ||= ( u = URI.parse(uri) u.path = '' u.to_s.freeze ) end |