Class: EZMQ::Subscriber
Overview
Subscribe socket that listens for messages with an optional topic.
Instance Attribute Summary
Attributes inherited from Socket
#context, #decode, #encode, #socket
Instance Method Summary collapse
-
#initialize(mode = :connect, **options) ⇒ Publisher
constructor
Creates a new Subscriber socket.
-
#listen {|message, topic| ... } ⇒ void
Like receive, but doesn’t stop at one message.
-
#receive(**options) {|message, topic| ... } ⇒ Object
Receive a message from the socket.
-
#subscribe(topic) ⇒ Boolean
Establishes a new message filter on the socket.
-
#unsubscribe(topic) ⇒ Boolean
Removes a message filter (as set with subscribe) from the socket.
Methods inherited from Socket
Constructor Details
#initialize(mode = :connect, **options) ⇒ Publisher
The default behaviour is to output and messages received to STDOUT.
Creates a new Subscriber socket.
18 19 20 21 |
# File 'lib/ezmq/subscribe.rb', line 18 def initialize(mode = :connect, **) super mode, ZMQ::SUB, subscribe [:topic] if [:topic] end |
Instance Method Details
#listen {|message, topic| ... } ⇒ void
This method returns an undefined value.
Like receive, but doesn’t stop at one message.
58 59 60 61 62 |
# File 'lib/ezmq/subscribe.rb', line 58 def listen(&block) loop do block.call(*receive) end end |
#receive(**options) {|message, topic| ... } ⇒ Object
This method blocks until a message arrives.
Receive a message from the socket.
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/ezmq/subscribe.rb', line 36 def receive(**) = '' @socket.recv_string = .match(/^(?<topic>[^\ ]*)\ (?<body>.*)/m) decoded = ([:decode] || @decode).call ['body'] if block_given? yield decoded, ['topic'] else [decoded, ['topic']] end end |
#subscribe(topic) ⇒ Boolean
By default, a Subscriber filters all incoming messages. Without
Establishes a new message filter on the socket.
calling subscribe at least once, no messages will be accepted. If topic was provided, #initialize calls #subscribe automatically.
prefix will be accepted.
75 76 77 |
# File 'lib/ezmq/subscribe.rb', line 75 def subscribe(topic) @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0 end |
#unsubscribe(topic) ⇒ Boolean
Removes a message filter (as set with subscribe) from the socket.
86 87 88 |
# File 'lib/ezmq/subscribe.rb', line 86 def unsubscribe(topic) @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0 end |