Class: BoomNats::Application
- Inherits:
-
Object
- Object
- BoomNats::Application
- Defined in:
- lib/boom_nats/application.rb
Instance Attribute Summary collapse
-
#nats_options ⇒ Object
Returns the value of attribute nats_options.
-
#route_topics ⇒ Object
readonly
Returns the value of attribute route_topics.
-
#router ⇒ Object
Returns the value of attribute router.
Instance Method Summary collapse
- #draw_routes(&block) ⇒ Object
- #error_as_json(msg, topic, error) ⇒ Object
- #execute(&block) ⇒ Object
-
#initialize ⇒ Application
constructor
A new instance of Application.
- #kill ⇒ Object
- #nats ⇒ Object
- #on_after(&block) ⇒ Object
- #on_before(&block) ⇒ Object
- #servers(value) ⇒ Object
- #setup(&block) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ Application
Returns a new instance of Application.
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/boom_nats/application.rb', line 9 def initialize @route_topics = [] @subscriptions = [] @callbacks = { before: [], after: [] } @mutex = Mutex.new end |
Instance Attribute Details
#nats_options ⇒ Object
Returns the value of attribute nats_options.
6 7 8 |
# File 'lib/boom_nats/application.rb', line 6 def @nats_options end |
#route_topics ⇒ Object (readonly)
Returns the value of attribute route_topics.
7 8 9 |
# File 'lib/boom_nats/application.rb', line 7 def route_topics @route_topics end |
#router ⇒ Object
Returns the value of attribute router.
6 7 8 |
# File 'lib/boom_nats/application.rb', line 6 def router @router end |
Instance Method Details
#draw_routes(&block) ⇒ Object
25 26 27 28 29 30 |
# File 'lib/boom_nats/application.rb', line 25 def draw_routes(&block) raise Error, "required block given" unless block_given? @router = BoomNats::Router.new(self) @router.setup(&block) end |
#error_as_json(msg, topic, error) ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/boom_nats/application.rb', line 48 def error_as_json(msg, topic, error) { message: msg, topic: topic, error: "#{error.class}: #{error.}", backtrace: error.backtrace, status: "error" }.to_json end |
#execute(&block) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/boom_nats/application.rb', line 106 def execute(&block) timeout = Concurrent::Cancellation.timeout 5 done = Concurrent::Channel.new(capacity: 1) Concurrent::Channel.go do loop do @mutex.synchronize do done << true if nats.connected? end done << false if timeout.origin.resolved? end end if ~done block.call(nats) else raise "Nats do not connected", BoomNats::Error end end |
#kill ⇒ Object
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/boom_nats/application.rb', line 95 def kill puts "exiting..." unless defined?(Rspec) sleep 1 Thread.new do @mutex.synchronize do stop exit unless defined?(Rspec) end end end |
#nats ⇒ Object
36 37 38 |
# File 'lib/boom_nats/application.rb', line 36 def nats NATS end |
#on_after(&block) ⇒ Object
44 45 46 |
# File 'lib/boom_nats/application.rb', line 44 def on_after(&block) @callbacks[:after] << block end |
#on_before(&block) ⇒ Object
40 41 42 |
# File 'lib/boom_nats/application.rb', line 40 def on_before(&block) @callbacks[:before] << block end |
#servers(value) ⇒ Object
20 21 22 23 |
# File 'lib/boom_nats/application.rb', line 20 def servers(value) stop @server = value end |
#setup(&block) ⇒ Object
32 33 34 |
# File 'lib/boom_nats/application.rb', line 32 def setup(&block) instance_eval(&block) if block_given? end |
#start ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/boom_nats/application.rb', line 58 def start Thread.new do @callbacks[:before].each { |callback| callback.call(self) } nats_connect do |nats| @route_topics.each do |rt| @subscriptions << nats.subscribe(rt.topic, rt.) do |msg, reply, _sub| rt.executor.new(msg, reply, nats, rt.serializer, rt.parser) rescue StandardError => e BoomNats.logger.error "BoomNats::error: #{e.}" nats.publish(reply, error_as_json(msg, rt.topic, e)) unless reply.nil? end end BoomNats.logger.debug "BoomNats::started" prepare_trap unless defined?(Rails::Railtie) @callbacks[:after].each { |callback| callback.call(self) } end end wait unless defined?(Rails::Railtie) end |
#stop ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/boom_nats/application.rb', line 83 def stop @subscriptions.each { |s| nats.unsubscribe(s) } @subscriptions = [] # disconnect from old server if already configured if nats&.connected? nats.drain do nats.stop end end end |