Class: Crossroads::Runner
- Inherits:
-
Object
- Object
- Crossroads::Runner
- Defined in:
- lib/crossroads/runner.rb
Instance Attribute Summary collapse
-
#router ⇒ Object
readonly
Returns the value of attribute router.
Instance Method Summary collapse
-
#initialize(configfile) ⇒ Runner
constructor
A new instance of Runner.
- #process ⇒ Object
- #route(msg) ⇒ Object
- #run! ⇒ Object
- #subscribe ⇒ Object
Constructor Details
#initialize(configfile) ⇒ Runner
Returns a new instance of Runner.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/crossroads/runner.rb', line 5 def initialize(configfile) defaults = {:configdir => "/etc/crossroads", :consume => nil, :logfile => "/dev/stderr", :loglevel => :info, :keeplogs => 10, :maxlogsize => 1024, :daemonie => false, :stomp => [{:server => "localhost", :port => 61613, :user => nil, :password => nil}]} @config = defaults.merge(YAML.load_file(configfile)) raise "Config file does not have input sources defined" unless @config[:consume] Log.configure(@config[:logfile], @config[:keeplogs], @config[:maxlogsize], @config[:loglevel]) Log.info("Crossroads version #{Crossroads.version} starting with configfile #{configfile}") @router = Router.new(@config[:configdir]) subscribe end |
Instance Attribute Details
#router ⇒ Object (readonly)
Returns the value of attribute router.
3 4 5 |
# File 'lib/crossroads/runner.rb', line 3 def router @router end |
Instance Method Details
#process ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/crossroads/runner.rb', line 35 def process loop do begin msg = @stomp.receive targets = route(msg) targets.each do |target| headers = msg.headers.merge({"crossroads_route" => target[:name], "crossroads_time" => target[:process_time]}) ["destination", "timestamp", "message-id"].each do |h| headers["crossroads_orig_#{h}"] = headers[h] headers.delete h end @stomp.publish(target[:target], msg.body, headers) end rescue Interrupt, SystemExit break rescue Exception => e Log.error("Failed to consume from the middleware: #{e.class}: #{e}") sleep 1 retry end end end |
#route(msg) ⇒ Object
71 72 73 |
# File 'lib/crossroads/runner.rb', line 71 def route(msg) @router.route(msg) end |
#run! ⇒ Object
31 32 33 |
# File 'lib/crossroads/runner.rb', line 31 def run! process end |