Class: ZMQMachine::Device::Queue
- Inherits:
-
Object
- Object
- ZMQMachine::Device::Queue
- Defined in:
- lib/zm/device/queue.rb
Overview
Used in conjunction with REQ/REP sockets to load balance the requests and replies over (potentially) multiple backends.
The basic mechanics are that the program contains 1 (or more) clients that talk to 1 (or more) backends that all perform the same work. Connecting to an intermediate queue device allows for the client requests to be fair-balanced among the available backend servers. The hidden identities passed along by REQ/REP sockets are used by the queue device’s internal XREQ/XREP sockets to route the messages back to the appropriate client.
Example:
# the queue creates sockets and binds to both given addresses; all messages get
# routed between the two
config = ZM::Device::Configuration.new
config.reactor = reactor
config.incoming_endpoint = "tcp://192.168.0.100:5050"
config.outgoing_endpoint = "tcp://192.168.0.100:5051"
config.verbose = false
config.linger = 10 # ms
config.hwm = 0
queue = ZM::Device::Queue.new(config)
# the +client_handler+ internally calls "connect" to the incoming address given above
client = reactor.req_socket(client_handler)
client2 = reactor.req_socket(client_handler)
# the +server_handler+ internally calls "connect" to the outgoing address given above
server = reactor.rep_socket(server_handler)
Defined Under Namespace
Classes: XRepHandler, XReqHandler
Instance Method Summary collapse
-
#initialize(config) ⇒ Queue
constructor
Takes either a properly formatted string that can be converted into a ZM::Address or takes a ZM::Address directly.
Constructor Details
#initialize(config) ⇒ Queue
Takes either a properly formatted string that can be converted into a ZM::Address or takes a ZM::Address directly.
Routes all messages received by either address to the other address.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/zm/device/queue.rb', line 153 def initialize(config) @reactor = config.reactor incoming = Address.from_string(config.incoming_endpoint.to_s) outgoing = Address.from_string(config.outgoing_endpoint.to_s) # setup the handlers for processing messages @handler_in = XRepHandler.new(config, incoming, :in) @handler_out = XReqHandler.new(config, outgoing, :out) # create each socket and pass in the appropriate handler @incoming_sock = @reactor.xrep_socket(@handler_in) @outgoing_sock = @reactor.xreq_socket(@handler_out) # set each handler's outgoing socket @handler_in.socket_out = @outgoing_sock @handler_out.socket_out = @incoming_sock end |