Class: Polyn::Conn

Inherits:
Object
  • Object
show all
Defined in:
lib/polyn.rb

Overview

A Polyn connection to NATS

Instance Method Summary collapse

Constructor Details

#initialize(nats, **opts) ⇒ Conn

Returns a new instance of Conn.



69
70
71
72
73
74
75
# File 'lib/polyn.rb', line 69

def initialize(nats, **opts)
  @nats         = nats_class.new(nats)
  # Schema store nats has to be a real one, not a mock, because
  # the only place to load the schemas is from a running nats-server
  @schema_store = opts[:schema_store] || schema_store(nats, **opts)
  @serializer   = Polyn::Serializers::Json.new(@schema_store)
end

Instance Method Details

#publish(type, data, **opts) ⇒ Object

Publishes a message on the Polyn network.

Will use information from the event to build up the ‘polyntrace` data

Parameters:

  • type (String)

    The type of event

  • data (any)

    The data to include in the event

  • options (Hash)

    a customizable set of options



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/polyn.rb', line 86

def publish(type, data, **opts)
  Polyn::Tracing.publish_span(type) do |span|
    event = Event.new({
      type:   type,
      source: opts[:source],
      data:   data,
    })

    json = @serializer.serialize!(event)

    Polyn::Tracing.span_attributes(span,
      nats:    @nats.nats,
      type:    type,
      event:   event,
      payload: json)

    header = add_headers(opts.fetch(:header, {}), event)

    @nats.publish(type, json, opts[:reply_to], header: header)
  end
end

#pull_subscribe(type, **opts) ⇒ Object

Subscribe to a pull consumer that already exists in the NATS server

is more than the ‘source_root`

Parameters:

  • nats (Object)

    Connected NATS instance from ‘NATS.connect`

  • type (String)

    The type of event

  • options (Hash)

    a customizable set of options



140
141
142
143
144
145
146
147
# File 'lib/polyn.rb', line 140

def pull_subscribe(type, **opts)
  Polyn::PullSubscriber.new({
    nats:       @nats,
    type:       type,
    source:     opts[:source],
    serializer: @serializer,
  })
end

#subscribe(type, opts = {}, &callback) ⇒ Object

Create subscription which is dispatched asynchronously and sends messages to a callback.

Parameters:

  • type (String)

    The type of event

  • options (Hash)

    a customizable set of options



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/polyn.rb', line 116

def subscribe(type, opts = {}, &callback)
  @nats.subscribe(type, opts) do |msg|
    Polyn::Tracing.subscribe_span(type, msg) do |span|
      event    = @serializer.deserialize!(msg.data)

      Polyn::Tracing.span_attributes(span,
        nats:    @nats.nats,
        type:    type,
        event:   event,
        payload: msg.data)

      msg.data = event
      callback.call(msg)
    end
  end
end