Class: ASIR::Transport::Zmq

Inherits:
ConnectionOriented
  • Object
show all
Defined in:
lib/asir/transport/zmq.rb

Overview

!SLIDE ZeroMQ Transport

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



10
11
12
# File 'lib/asir/transport/zmq.rb', line 10

def queue
  @queue
end

Instance Method Details

#_client_connect!Object

!SLIDE 0MQ client.



14
15
16
17
18
19
20
# File 'lib/asir/transport/zmq.rb', line 14

def _client_connect!
  sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ)
  sock.connect(zmq_uri)
  sock
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
end

#_read(stream, state) ⇒ Object



53
54
55
56
57
58
# File 'lib/asir/transport/zmq.rb', line 53

def _read stream, state
  if data = stream.recv(0) and one_way
    q, data = data.split(/ /, 2)
  end
  data
end

#_receive_result(state) ⇒ Object



33
34
35
36
# File 'lib/asir/transport/zmq.rb', line 33

def _receive_result state
  return nil if one_way || state.message.one_way
  super
end

#_send_result(state) ⇒ Object



38
39
40
41
# File 'lib/asir/transport/zmq.rb', line 38

def _send_result state
  return nil if one_way || state.message.one_way
  super
end

#_server!Object

!SLIDE 0MQ server.



24
25
26
27
28
29
30
31
# File 'lib/asir/transport/zmq.rb', line 24

def _server!
  sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP)
  sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way
  sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work?
  @server = sock
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
end

#_server_accept_connection!(server) ⇒ Object

server represents a receiving ZMQ endpoint.



79
80
81
# File 'lib/asir/transport/zmq.rb', line 79

def _server_accept_connection! server
  [ server, @one_way ? nil : server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object

Nothing to be closed for ZMQ.



91
92
93
# File 'lib/asir/transport/zmq.rb', line 91

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_write(payload, stream, state) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/asir/transport/zmq.rb', line 43

def _write payload, stream, state
  if one_way
    q = state && state.message
    q &&= q[:zmq_queue] || q[:queue]
    payload.insert(0, q ? "#{q} " : queue_)
  end
  stream.send payload, 0
  stream
end

#queue_Object



73
74
75
76
# File 'lib/asir/transport/zmq.rb', line 73

def queue_
  @queue_ ||=
    (queue.empty? ? "asir ": queue + " ").freeze
end

#stream_eof?(stream) ⇒ Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/asir/transport/zmq.rb', line 86

def stream_eof? stream
  false
end

#zmq_contextObject



104
105
106
107
# File 'lib/asir/transport/zmq.rb', line 104

def zmq_context
  @@zmq_context ||=
    ZMQ::Context.new(1)
end

#zmq_uriObject



95
96
97
98
99
100
101
102
# File 'lib/asir/transport/zmq.rb', line 95

def zmq_uri
  @zmq_uri ||=
    (
    u = URI.parse(uri)
    u.path = ''
    u.to_s.freeze
    )
end