Class: Polyn::PullSubscriber
- Inherits:
-
Object
- Object
- Polyn::PullSubscriber
- Defined in:
- lib/polyn/pull_subscriber.rb
Overview
Wrapper around nats-pure that can validate polyn messages
Class Method Summary collapse
-
.validate_consumer_exists!(nats, stream, consumer_name) ⇒ Object
nats-pure will create a consumer if the one you passed does not exist.
Instance Method Summary collapse
-
#fetch(batch = 1, params = {}) ⇒ Array<NATS::Msg>
fetch makes a request to be delivered more messages from a pull consumer.
-
#initialize(fields) ⇒ PullSubscriber
constructor
is more than the ‘source_root`.
Constructor Details
#initialize(fields) ⇒ PullSubscriber
is more than the ‘source_root`
12 13 14 15 16 17 18 19 20 |
# File 'lib/polyn/pull_subscriber.rb', line 12 def initialize(fields) @nats = fields.fetch(:nats) @type = Polyn::Naming.trim_domain_prefix(fields.fetch(:type)) @consumer_name = Polyn::Naming.consumer_name(@type, fields[:source]) @stream = @nats.jetstream.find_stream_name_by_subject(@type) self.class.validate_consumer_exists!(@nats, @stream, @consumer_name) @psub = @nats.jetstream.pull_subscribe(@type, @consumer_name) @serializer = fields.fetch(:serializer) end |
Class Method Details
.validate_consumer_exists!(nats, stream, consumer_name) ⇒ Object
nats-pure will create a consumer if the one you passed does not exist. Polyn wants to avoid this functionality and instead encourage consumer creation in the centralized ‘events` codebase so that it’s documented, discoverable, and polyn-cli can manage it
26 27 28 29 30 31 32 |
# File 'lib/polyn/pull_subscriber.rb', line 26 def self.validate_consumer_exists!(nats, stream, consumer_name) nats.jetstream.consumer_info(stream, consumer_name) rescue NATS::JetStream::Error::NotFound raise Polyn::Errors::ValidationError, "Consumer #{consumer_name} does not exist. Use polyn-cli to create"\ "it before attempting to subscribe" end |
Instance Method Details
#fetch(batch = 1, params = {}) ⇒ Array<NATS::Msg>
fetch makes a request to be delivered more messages from a pull consumer.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/polyn/pull_subscriber.rb', line 40 def fetch(batch = 1, params = {}) Polyn::Tracing.processing_span(@type) do |process_span| msgs = @psub.fetch(batch, params) msgs.map do |msg| Polyn::Tracing.subscribe_span(@type, msg, links: [process_span]) do |span| updated_msg = (msg) Polyn::Tracing.span_attributes(span, nats: @nats, type: @type, event: updated_msg.data, payload: msg.data) updated_msg end end end end |