Class: Crossroads::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/crossroads/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configfile) ⇒ Runner

Returns a new instance of Runner.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/crossroads/runner.rb', line 5

def initialize(configfile)
  defaults = {:configdir  => "/etc/crossroads",
              :consume    => nil,
              :logfile    => "/dev/stderr",
              :loglevel   => :info,
              :keeplogs   => 10,
              :maxlogsize => 1024,
              :daemonie   => false,
              :stomp      => [{:server   => "localhost",
                              :port     => 61613,
                              :user     => nil,
                              :password => nil}]}

  @config = defaults.merge(YAML.load_file(configfile))

  raise "Config file does not have input sources defined" unless @config[:consume]

  Log.configure(@config[:logfile], @config[:keeplogs], @config[:maxlogsize], @config[:loglevel])

  Log.info("Crossroads version #{Crossroads.version} starting with configfile #{configfile}")

  @router = Router.new(@config[:configdir])

  subscribe
end

Instance Attribute Details

#routerObject (readonly)

Returns the value of attribute router.



3
4
5
# File 'lib/crossroads/runner.rb', line 3

def router
  @router
end

Instance Method Details

#processObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/crossroads/runner.rb', line 35

def process
  loop do
    begin
      msg = @stomp.receive

      targets = route(msg)

      targets.each do |target|
        headers = msg.headers.merge({"crossroads_route" => target[:name], "crossroads_time" => target[:process_time]})

        ["destination", "timestamp", "message-id"].each do |h|
          headers["crossroads_orig_#{h}"] = headers[h]
          headers.delete h
        end

        @stomp.publish(target[:target], msg.body, headers)
      end
    rescue Interrupt, SystemExit
      break
    rescue Exception => e
      Log.error("Failed to consume from the middleware: #{e.class}: #{e}")

      sleep 1
      retry
    end
  end
end

#route(msg) ⇒ Object



71
72
73
# File 'lib/crossroads/runner.rb', line 71

def route(msg)
  @router.route(msg)
end

#run!Object



31
32
33
# File 'lib/crossroads/runner.rb', line 31

def run!
  process
end

#subscribeObject



63
64
65
66
67
68
69
# File 'lib/crossroads/runner.rb', line 63

def subscribe
  @stomp = Stomp.new(@config[:stomp])

  [@config[:consume]].flatten.each do |source|
    @stomp.subscribe source
  end
end