Module: Altaire::Gateway
- Defined in:
- lib/altaire/gateway.rb
Class Method Summary collapse
- .provide(subject, queue: "provider", interruptable: false, &block) ⇒ Object
- .start(urls, &block) ⇒ Object
Class Method Details
.provide(subject, queue: "provider", interruptable: false, &block) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/altaire/gateway.rb', line 18 def self.provide (subject, queue: "provider", interruptable: false, &block) NATS.subscribe subject, queue: queue do |msg, reply| msg = JSON.parse(msg) thread = Thread.new do NATS.publish reply, { result: block.(msg) }.to_json rescue => e NATS.publish reply, { error: e. }.to_json ensure @requests.delete(msg["id"]) end @requests[msg["id"]] = thread if interruptable end end |
.start(urls, &block) ⇒ Object
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/altaire/gateway.rb', line 7 def self.start (urls, &block) NATS.start servers: [urls].flatten do @requests = {} NATS.subscribe "meta.cancel" do |msg| @requests[msg.to_i].raise(Interrupt, "Cancelled by gateway") rescue Exception end block.(self) end end |