Class: Deployable::Zmq::Subscribe

Inherits:
Object
  • Object
show all
Defined in:
lib/deployable/zmq/subscribe.rb

Overview

Deployable::Zmq provides a generic set of helpers do you don't have to do so much leg work.

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Subscribe

Returns a new instance of Subscribe.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/deployable/zmq/subscribe.rb', line 19

def initialize options = {}
  @context    = ZMQ::Context.new
  @subscriber = @context.socket ZMQ::SUB

  @port       = options.fetch :port, __class_ivg('port')
  @port       = DEFAULT_BIND_PORT if @port.nil?

  @address    = options.fetch :options, __class_ivg('address')
  raise "No valid address [#{@address}]" if @address.nil? or @address == '*'

  url = "tcp://#{@address}:#{@port}"

  raise "Failed to connect to [#{url}] [#{rc}]" unless
    rc = @subscriber.connect( url ) == 0 

  #log.debug "zmq subscribing to [#{url}]"

  @subscribe = options.fetch( :subscribe, nil )
  
  raise "Failed to subscribe to [#{url}] [#{rc}]" unless
    @subscribe.nil? or 
    rc = @subscriber.setsockopt( ZMQ::SUBSCRIBE, 'REQ_COMP' ) == 0 

end

Instance Method Details

#endObject

End a subscription



59
60
61
# File 'lib/deployable/zmq/subscribe.rb', line 59

def end
  @subscriber.close
end

#go(callback) ⇒ Object

Watch a queue for messages



52
53
54
55
56
# File 'lib/deployable/zmq/subscribe.rb', line 52

def go callback
  loop do
    callback.call receive
  end
end

#receiveObject

Recieve a message off the queue



45
46
47
48
49
# File 'lib/deployable/zmq/subscribe.rb', line 45

def receive
  @subscriber.recv_strings( parts = [] )
  #log.debug "a queue item [%s]\n", parts.join(' ')
  parts
end