Class: Qswarm::Agent

Inherits:
Object
  • Object
show all
Includes:
DSL
Defined in:
lib/qswarm/agent.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from DSL

#dsl_call, #dsl_load, included

Constructor Details

#initialize(swarm, name, args = nil, &block) ⇒ Agent

Returns a new instance of Agent.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/qswarm/agent.rb', line 12

def initialize(swarm, name, args = nil, &block)
  @swarm              = swarm
  @name               = name
  @clients            = {}
  @args               = args
  @filters            = {}
  @handlers           = {}
  @payload            = nil

  dsl_call(&block)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/qswarm/agent.rb', line 9

def name
  @name
end

#swarmObject (readonly)

Returns the value of attribute swarm.



9
10
11
# File 'lib/qswarm/agent.rb', line 9

def swarm
  @swarm
end

Instance Method Details

#after(connection, *guards, &block) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/qswarm/agent.rb', line 58

def after(connection, *guards, &block)
  Qswarm.logger.info "[#{@name.inspect}] Registering :after filter for #{connection.inspect}/#{guards.inspect}"

  [*connection].each do |c|
    register_filter :after, c, *guards, &block
  end
end

#agentObject



24
25
26
# File 'lib/qswarm/agent.rb', line 24

def agent
  self
end

#before(connection, *guards, &block) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/qswarm/agent.rb', line 50

def before(connection, *guards, &block)
  Qswarm.logger.info "[#{@name.inspect}] Registering :before filter for #{connection.inspect}/#{guards.inspect}"

  [*connection].each do |c|
    register_filter :before, c, *guards, &block
  end
end

#connect(name, args = nil, &block) ⇒ Object

Connects to a data stream

Parameters:

  • name (String)

    the name of the connection

  • args (Hash) (defaults to: nil)

    arguments for the connection

  • &block (Proc)

    a block which is passed to the client constructor



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/qswarm/agent.rb', line 37

def connect(name, args = nil, &block)
  raise "Connection '#{name.inspect}' is already registered" if @clients[name]

  if !args.nil? && !args[:type].nil?
    Qswarm.logger.info "[#{@name.inspect}] Registering #{args[:type].inspect} connection #{name.inspect}"
    require "qswarm/connections/#{args[:type].downcase}"
    @clients[name] = eval("Qswarm::Connections::#{args[:type].capitalize}").new(self, name, args, &block)
  else
    Qswarm.logger.info "[#{@name.inspect}] Registering default connection #{name.inspect}"
    @clients[name] = Qswarm::Connection.new(self, name, args, &block)
  end
end

#emit(connection, args = nil, &block) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/qswarm/agent.rb', line 97

def emit(connection, args = nil, &block)
  # Need to set @payload for access by the dsl_call when handlers are run
  # Overwriting global parent this will break when nesting emit in source which will loose the payload originating from connection
  @payload = args[:payload] unless args.nil? || args[:payload].nil?
  @payload = dsl_call(&block) if block_given?

  Qswarm.logger.debug "[#{@name.inspect}] Connection #{connection.inspect} emitting #{@payload.inspect}"

  @payload.data = case payload.format
                  when :json
                    JSON.parse(@payload.raw, :symbolize_names => true)
                  when :xml
                    Nokogiri::XML(@payload.raw)
                  else # :raw
                   @payload.raw
                  end

  [*connection].each do |c|
    run_filters :before, c
    call_handlers c
    run_filters :after, c
  end
end

#payloadObject



28
29
30
# File 'lib/qswarm/agent.rb', line 28

def payload
  @payload
end

#runObject



121
122
123
# File 'lib/qswarm/agent.rb', line 121

def run
  @clients.each { |name, client| client.run }
end

#sink(connection, args = nil, &block) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/qswarm/agent.rb', line 74

def sink(connection, args = nil, &block)
  Qswarm.logger.debug "[#{@name.inspect}] Sink #{connection.inspect} received #{@payload.inspect}"

  # Payload from DSL parent context overidden by arguments and block locally to sink
  p = @payload.dup
  p.data = args[:data] unless args.nil? || args[:data].nil?
  p.data = dsl_call(&block) if block_given?

  [*connection].each do |c|
    # Update raw from the current data
    p.raw = case args[:format].nil? ? @clients[c].format : args[:format]
            when :json
              JSON.generate(p.data)
            when :xml
              p.data.to_xml
            else # raw
              p.data
            end unless args.nil?

    @clients[c].sink(args, p)
  end
end

#source(connection, *guards, &block) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/qswarm/agent.rb', line 66

def source(connection, *guards, &block)
  Qswarm.logger.info "[#{@name.inspect}] Registering handler for #{connection.inspect}/#{guards.inspect}"

  [*connection].each do |c|
    register_handler c, *guards, &block
  end
end