Class: JetstreamBridge::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/publisher/publisher.rb

Overview

Publishes events to NATS JetStream with reliability features.

Publishes events to “env.app.sync.dest” subject pattern. Supports optional transactional outbox pattern for guaranteed delivery.

Examples:

Basic publishing

publisher = JetstreamBridge::Publisher.new
result = publisher.publish(
  resource_type: "user",
  event_type: "created",
  payload: { id: 1, email: "[email protected]" }
)
puts "Published: #{result.event_id}" if result.success?

Publishing with custom retry strategy

custom_strategy = MyRetryStrategy.new(max_attempts: 5)
publisher = JetstreamBridge::Publisher.new(retry_strategy: custom_strategy)
result = publisher.publish(event_type: "user.created", payload: { id: 1 })

Using convenience method

JetstreamBridge.publish(event_type: "user.created", payload: { id: 1 })

Instance Method Summary collapse

Constructor Details

#initialize(retry_strategy: nil) ⇒ Publisher

Initialize a new Publisher instance.

Note: The NATS connection should already be established via JetstreamBridge.startup! or automatically on first use. This assumes the connection is already established.

Raises:



45
46
47
48
49
50
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 45

def initialize(retry_strategy: nil)
  @jts = Connection.jetstream
  raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts

  @retry_strategy = retry_strategy || PublisherRetryStrategy.new
end

Instance Method Details

#do_publish(subject, envelope) ⇒ Models::PublishResult

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal publish method that routes to appropriate publish strategy.

Routes to outbox-based publishing if use_outbox is enabled, otherwise publishes directly to NATS with retry logic.



140
141
142
143
144
145
146
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 140

def do_publish(subject, envelope)
  if JetstreamBridge.config.use_outbox
    publish_via_outbox(subject, envelope)
  else
    with_retries { publish_to_nats(subject, envelope) }
  end
end

#publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options) ⇒ Models::PublishResult

Publishes an event to NATS JetStream.

Supports multiple usage patterns for flexibility:

  1. Structured parameters (recommended): publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })

  2. Hash/envelope with dot notation (auto-infers resource_type): publish(event_type: ‘user.created’, payload: …)

  3. Complete envelope (advanced): publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })

When use_outbox is enabled, events are persisted to database first for reliability. The event_id is used for deduplication via NATS message ID header.

Examples:

Structured parameters

result = publisher.publish(
  resource_type: "user",
  event_type: "created",
  payload: { id: 1, email: "[email protected]" }
)
puts "Published: #{result.event_id}" if result.success?

With options

result = publisher.publish(
  event_type: "user.created",
  payload: { id: 1 },
  event_id: "custom-id-123",
  trace_id: request_id,
  occurred_at: Time.now.utc
)

Error handling

result = publisher.publish(event_type: "order.created", payload: { id: 1 })
if result.failure?
  logger.error "Failed to publish: #{result.error.message}"
end

Raises:

  • (ArgumentError)

    If required parameters are missing or invalid



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 110

def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options)
  ensure_destination_app_configured!

  params = { event_or_hash: event_or_hash, resource_type: resource_type, event_type: event_type,
             payload: payload, subject: subject, options: options }
  envelope, resolved_subject = route_publish_params(params)

  do_publish(resolved_subject, envelope)
rescue ArgumentError
  # Re-raise validation errors for invalid parameters
  raise
rescue StandardError => e
  # Return failure result for publishing errors
  Models::PublishResult.new(
    success: false,
    event_id: envelope&.[]('event_id') || 'unknown',
    subject: resolved_subject || 'unknown',
    error: e
  )
end