Class: BoomNats::Application

Inherits:
Object
  • Object
show all
Defined in:
lib/boom_nats/application.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeApplication

Returns a new instance of Application.



9
10
11
12
13
14
15
16
17
18
# File 'lib/boom_nats/application.rb', line 9

def initialize
  @route_topics = []
  @subscriptions = []
  @callbacks = {
    before: [],
    after: []
  }

  @mutex = Mutex.new
end

Instance Attribute Details

#nats_optionsObject

Returns the value of attribute nats_options.



6
7
8
# File 'lib/boom_nats/application.rb', line 6

def nats_options
  @nats_options
end

#route_topicsObject (readonly)

Returns the value of attribute route_topics.



7
8
9
# File 'lib/boom_nats/application.rb', line 7

def route_topics
  @route_topics
end

#routerObject

Returns the value of attribute router.



6
7
8
# File 'lib/boom_nats/application.rb', line 6

def router
  @router
end

Instance Method Details

#draw_routes(&block) ⇒ Object

Raises:



25
26
27
28
29
30
# File 'lib/boom_nats/application.rb', line 25

def draw_routes(&block)
  raise Error, "required block given" unless block_given?

  @router = BoomNats::Router.new(self)
  @router.setup(&block)
end

#error_as_json(msg, topic, error) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/boom_nats/application.rb', line 48

def error_as_json(msg, topic, error)
  {
    message: msg,
    topic: topic,
    error: "#{error.class}: #{error.message}",
    backtrace: error.backtrace,
    status: "error"
  }.to_json
end

#execute(&block) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/boom_nats/application.rb', line 106

def execute(&block)
  timeout = Concurrent::Cancellation.timeout 5
  done = Concurrent::Channel.new(capacity: 1)
  Concurrent::Channel.go do
    loop do
      @mutex.synchronize do
        done << true if nats.connected?
      end

      done << false if timeout.origin.resolved?
    end
  end

  if ~done
    block.call(nats)
  else
    raise "Nats do not connected", BoomNats::Error
  end
end

#killObject



95
96
97
98
99
100
101
102
103
104
# File 'lib/boom_nats/application.rb', line 95

def kill
  puts "exiting..." unless defined?(Rspec)
  sleep 1
  Thread.new do
    @mutex.synchronize do
      stop
      exit unless defined?(Rspec)
    end
  end
end

#natsObject



36
37
38
# File 'lib/boom_nats/application.rb', line 36

def nats
  NATS
end

#on_after(&block) ⇒ Object



44
45
46
# File 'lib/boom_nats/application.rb', line 44

def on_after(&block)
  @callbacks[:after] << block
end

#on_before(&block) ⇒ Object



40
41
42
# File 'lib/boom_nats/application.rb', line 40

def on_before(&block)
  @callbacks[:before] << block
end

#servers(value) ⇒ Object



20
21
22
23
# File 'lib/boom_nats/application.rb', line 20

def servers(value)
  stop
  @server = value
end

#setup(&block) ⇒ Object



32
33
34
# File 'lib/boom_nats/application.rb', line 32

def setup(&block)
  instance_eval(&block) if block_given?
end

#startObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/boom_nats/application.rb', line 58

def start
  Thread.new do
    @callbacks[:before].each { |callback| callback.call(self) }

    nats_connect do |nats|
      @route_topics.each do |rt|
        @subscriptions << nats.subscribe(rt.topic, rt.options) do |msg, reply, _sub|
          rt.executor.new(msg, reply, nats, rt.serializer, rt.parser)
        rescue StandardError => e
          BoomNats.logger.error "BoomNats::error: #{e.message}"
          nats.publish(reply, error_as_json(msg, rt.topic, e)) unless reply.nil?
        end
      end

      BoomNats.logger.debug "BoomNats::started"

      prepare_trap unless defined?(Rails::Railtie)

      @callbacks[:after].each { |callback| callback.call(self) }
    end
  end

  wait unless defined?(Rails::Railtie)
end

#stopObject



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/boom_nats/application.rb', line 83

def stop
  @subscriptions.each { |s| nats.unsubscribe(s) }
  @subscriptions = []

  # disconnect from old server if already configured
  if nats&.connected?
    nats.drain do
      nats.stop
    end
  end
end