Class: Cabin::Outputs::ZeroMQ
- Inherits:
-
Object
- Object
- Cabin::Outputs::ZeroMQ
- Defined in:
- lib/cabin/outputs/zeromq.rb
Overview
Output to a zeromq socket.
Constant Summary collapse
- DEFAULTS =
{ :topology => "pushpull", :hwm => 0, # zeromq default: no limit :linger => -1, # zeromq default: wait until all messages are sent. :topic => "" }
- CONTEXT =
ZMQ::Context.new
Instance Attribute Summary collapse
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
-
#topology ⇒ Object
readonly
Returns the value of attribute topology.
Instance Method Summary collapse
- #<<(event) ⇒ Object
- #hwm ⇒ Object
-
#initialize(addresses, options = {}) ⇒ ZeroMQ
constructor
Create a new ZeroMQ output.
- #linger ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(addresses, options = {}) ⇒ ZeroMQ
Create a new ZeroMQ output.
arguments: addresses A list of addresses to connect to. These are round-robined by zeromq.
:topology Either ‘pushpull’ or ‘pubsub’. Specifies which zeromq socket type to use. Default pushpull. :hwm Specifies the High Water Mark for the socket. Default 0, which means there is none. :linger Specifies the linger time in milliseconds for the socket. Default -1, meaning wait forever for the socket to close. :topic Specifies the topic for a pubsub topology. This can be a string or a proc with the event as the only argument.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/cabin/outputs/zeromq.rb', line 26 def initialize(addresses, ={}) = DEFAULTS.merge() @topology = [:topology].to_s case @topology when "pushpull" socket_type = ZMQ::PUSH when "pubsub" socket_type = ZMQ::PUB end @topic = [:topic] @socket = CONTEXT.socket(socket_type) Array(addresses).each do |address| error_check @socket.connect(address), "connecting to #{address}" end error_check @socket.setsockopt(ZMQ::LINGER, [:linger]), "while setting ZMQ::LINGER to #{[:linger]}" error_check @socket.setsockopt(ZMQ::HWM, [:hwm]), "while setting ZMQ::HWM to #{[:hwm]}" #TODO use cabin's teardown when it exists at_exit do teardown end #define_finalizer end |
Instance Attribute Details
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
15 16 17 |
# File 'lib/cabin/outputs/zeromq.rb', line 15 def socket @socket end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
15 16 17 |
# File 'lib/cabin/outputs/zeromq.rb', line 15 def topic @topic end |
#topology ⇒ Object (readonly)
Returns the value of attribute topology.
15 16 17 |
# File 'lib/cabin/outputs/zeromq.rb', line 15 def topology @topology end |
Instance Method Details
#<<(event) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/cabin/outputs/zeromq.rb', line 67 def <<(event) if @socket.name == "PUB" topic = @topic.is_a?(Proc) ? @topic.call(event) : @topic error_check @socket.send_string(topic, ZMQ::SNDMORE), "in topic send_string" end error_check @socket.send_string(event.inspect), "in send_string" end |
#hwm ⇒ Object
61 62 63 64 65 |
# File 'lib/cabin/outputs/zeromq.rb', line 61 def hwm array = [] error_check @socket.getsockopt(ZMQ::HWM, array), "while getting ZMQ::HWM" array.first end |
#linger ⇒ Object
55 56 57 58 59 |
# File 'lib/cabin/outputs/zeromq.rb', line 55 def linger array = [] error_check @socket.getsockopt(ZMQ::LINGER, array), "while getting ZMQ::LINGER" array.first end |
#teardown ⇒ Object
75 76 77 |
# File 'lib/cabin/outputs/zeromq.rb', line 75 def teardown @socket.close if @socket end |