Class: Qswarm::Listener

Inherits:
Object
  • Object
show all
Extended by:
DSL
Includes:
Loggable
Defined in:
lib/qswarm/listener.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from DSL

dsl_accessor

Methods included from Loggable

#logger, logger

Constructor Details

#initialize(agent, name, args, &block) ⇒ Listener

Returns a new instance of Listener.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/qswarm/listener.rb', line 15

def initialize(agent, name, args, &block)
  @agent    = agent
  @name     = name.to_s
  @speakers = []
  @sinks    = []
  @format   = :json
  @instances = nil

  @queue_args     = { :auto_delete => true, :durable => true, :exclusive => true }
  @subscribe_args = { :exclusive => false, :ack => false }

  @speaker = args.delete :speaker unless args.nil?

  # @subscribe_args.merge! args.delete(:subscribe) unless args.nil?
  @queue_args.merge! args unless args.nil?

  self.instance_eval(&block)
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



13
14
15
# File 'lib/qswarm/listener.rb', line 13

def agent
  @agent
end

Instance Method Details

#ack?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/qswarm/listener.rb', line 45

def ack?
  @subscribe_args[:ack]
end

#bind(routing_key, options = nil) ⇒ Object



34
35
36
37
38
# File 'lib/qswarm/listener.rb', line 34

def bind(routing_key, options = nil)
  @bind = routing_key
  @queue_args.merge! options unless options.nil?
  logger.info "Binding listener #{@name} < #{routing_key}"
end

#get_broker(name = nil) ⇒ Object



68
69
70
71
# File 'lib/qswarm/listener.rb', line 68

def get_broker(name = nil)
  name ||= @broker
  @agent.get_broker(name)
end

#runObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/qswarm/listener.rb', line 73

def run
  # Any setup that needs to be done
  @speakers.map { |s| s.run }
  
  @bind ||= @name
  logger.info "Listening on #{@name} < #{@bind}"

  get_broker.channel(@name, @bind).prefetch(@instances) unless @instances.nil?
  get_broker.queue(@name + @uuid ||= '', @bind, @queue_args).subscribe(@subscribe_args) do |, payload|
    logger.debug "[#{@agent.name}] Received '#{payload.inspect}' on listener #{@name}/#{.routing_key}"

    running = @speakers.map { |s| s.object_id }
    callback = proc do |speaker|
      running.delete speaker.object_id
      .ack if ack? && running.empty?
    end

    @speakers.map do |speaker|
      EM.defer nil, callback do
        speaker.parse(, payload)
        speaker
      end
    end
  end
end

#speak(name, args = nil, &block) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/qswarm/listener.rb', line 58

def speak(name, args = nil, &block)
  if !args.nil? && !args[:type].nil?
    require "qswarm/speakers/#{args[:type].downcase}"
    @speakers << eval("Qswarm::Speakers::#{args[:type].capitalize}").new(self, name, args, &block)
  else
    @speakers << Qswarm::Speaker.new(self, name, args, &block)
  end
  logger.info "Registering speaker: #{name} < #{@name}"
end

#subscribe(*options) ⇒ Object

Not sure about this



41
42
43
# File 'lib/qswarm/listener.rb', line 41

def subscribe(*options)
  Array[*options].each { |o| @subscribe_args[o] = true }
end

#swarm(instances = nil) ⇒ Object



49
50
51
# File 'lib/qswarm/listener.rb', line 49

def swarm(instances = nil)
  @instances = instances
end

#uniqObject

Should this be subscribe options?



54
55
56
# File 'lib/qswarm/listener.rb', line 54

def uniq
  @uuid = '-' + UUID.generate
end