Class: Polyn::PullSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/polyn/pull_subscriber.rb

Overview

Wrapper around nats-pure that can validate polyn messages

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fields) ⇒ PullSubscriber

is more than the ‘source_root`

Parameters:

  • fields (Object)

    :nats - Connected NATS instance from ‘NATS.connect`

  • fields (String)

    :type - The type of event

Options Hash (fields):

  • :source (String)
    • If the ‘source` portion of the consumer name


12
13
14
15
16
17
18
19
20
# File 'lib/polyn/pull_subscriber.rb', line 12

def initialize(fields)
  @nats          = fields.fetch(:nats)
  @type          = Polyn::Naming.trim_domain_prefix(fields.fetch(:type))
  @consumer_name = Polyn::Naming.consumer_name(@type, fields[:source])
  @stream        = @nats.jetstream.find_stream_name_by_subject(@type)
  self.class.validate_consumer_exists!(@nats, @stream, @consumer_name)
  @psub          = @nats.jetstream.pull_subscribe(@type, @consumer_name)
  @serializer    = fields.fetch(:serializer)
end

Class Method Details

.validate_consumer_exists!(nats, stream, consumer_name) ⇒ Object

nats-pure will create a consumer if the one you passed does not exist. Polyn wants to avoid this functionality and instead encourage consumer creation in the centralized ‘events` codebase so that it’s documented, discoverable, and polyn-cli can manage it


26
27
28
29
30
31
32
# File 'lib/polyn/pull_subscriber.rb', line 26

def self.validate_consumer_exists!(nats, stream, consumer_name)
  nats.jetstream.consumer_info(stream, consumer_name)
rescue NATS::JetStream::Error::NotFound
  raise Polyn::Errors::ValidationError,
    "Consumer #{consumer_name} does not exist. Use polyn-cli to create"\
    "it before attempting to subscribe"
end

Instance Method Details

#fetch(batch = 1, params = {}) ⇒ Array<NATS::Msg>

fetch makes a request to be delivered more messages from a pull consumer.

Parameters:

  • batch (Fixnum) (defaults to: 1)

    Number of messages to pull from the stream.

  • params (Hash) (defaults to: {})

    Options to customize the fetch request.

Options Hash (params):

  • :timeout (Float)

    Duration of the fetch request before it expires.

Returns:

  • (Array<NATS::Msg>)

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/polyn/pull_subscriber.rb', line 40

def fetch(batch = 1, params = {})
  Polyn::Tracing.processing_span(@type) do |process_span|
    msgs = @psub.fetch(batch, params)
    msgs.map do |msg|
      Polyn::Tracing.subscribe_span(@type, msg, links: [process_span]) do |span|
        updated_msg = process_message(msg)
        Polyn::Tracing.span_attributes(span,
          nats:    @nats,
          type:    @type,
          event:   updated_msg.data,
          payload: msg.data)
        updated_msg
      end
    end
  end
end