Class: Polyn::Conn
- Inherits:
-
Object
- Object
- Polyn::Conn
- Defined in:
- lib/polyn.rb
Overview
A Polyn connection to NATS
Instance Method Summary collapse
-
#initialize(nats, **opts) ⇒ Conn
constructor
A new instance of Conn.
-
#publish(type, data, **opts) ⇒ Object
Publishes a message on the Polyn network.
-
#pull_subscribe(type, **opts) ⇒ Object
Subscribe to a pull consumer that already exists in the NATS server.
-
#subscribe(type, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously and sends messages to a callback.
Constructor Details
#initialize(nats, **opts) ⇒ Conn
Returns a new instance of Conn.
69 70 71 72 73 74 75 |
# File 'lib/polyn.rb', line 69 def initialize(nats, **opts) @nats = nats_class.new(nats) # Schema store nats has to be a real one, not a mock, because # the only place to load the schemas is from a running nats-server @schema_store = opts[:schema_store] || schema_store(nats, **opts) @serializer = Polyn::Serializers::Json.new(@schema_store) end |
Instance Method Details
#publish(type, data, **opts) ⇒ Object
Publishes a message on the Polyn network.
Will use information from the event to build up the ‘polyntrace` data
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/polyn.rb', line 86 def publish(type, data, **opts) Polyn::Tracing.publish_span(type) do |span| event = Event.new({ type: type, source: opts[:source], data: data, }) json = @serializer.serialize!(event) Polyn::Tracing.span_attributes(span, nats: @nats.nats, type: type, event: event, payload: json) header = add_headers(opts.fetch(:header, {}), event) @nats.publish(type, json, opts[:reply_to], header: header) end end |
#pull_subscribe(type, **opts) ⇒ Object
Subscribe to a pull consumer that already exists in the NATS server
is more than the ‘source_root`
140 141 142 143 144 145 146 147 |
# File 'lib/polyn.rb', line 140 def pull_subscribe(type, **opts) Polyn::PullSubscriber.new({ nats: @nats, type: type, source: opts[:source], serializer: @serializer, }) end |
#subscribe(type, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously and sends messages to a callback.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/polyn.rb', line 116 def subscribe(type, opts = {}, &callback) @nats.subscribe(type, opts) do |msg| Polyn::Tracing.subscribe_span(type, msg) do |span| event = @serializer.deserialize!(msg.data) Polyn::Tracing.span_attributes(span, nats: @nats.nats, type: type, event: event, payload: msg.data) msg.data = event callback.call(msg) end end end |