Module: Altaire::Gateway

Defined in:
lib/altaire/gateway.rb

Class Method Summary collapse

Class Method Details

.provide(subject, queue: "provider", interruptable: false, &block) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/altaire/gateway.rb', line 18

def self.provide (subject, queue: "provider", interruptable: false, &block)
  NATS.subscribe subject, queue: queue do |msg, reply|
    msg = JSON.parse(msg)
    thread = Thread.new do
      NATS.publish reply, { result: block.(msg) }.to_json
    rescue => e
      NATS.publish reply, { error: e.message }.to_json
    ensure
      @requests.delete(msg["id"])
    end
    @requests[msg["id"]] = thread if interruptable
  end
end

.start(urls, &block) ⇒ Object



7
8
9
10
11
12
13
14
15
16
# File 'lib/altaire/gateway.rb', line 7

def self.start (urls, &block)
  NATS.start servers: [urls].flatten do
    @requests = {}
    NATS.subscribe "meta.cancel" do |msg|
      @requests[msg.to_i].raise(Interrupt, "Cancelled by gateway")
    rescue Exception
    end
    block.(self)
  end
end