Class: JetstreamBridge::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/publisher/publisher.rb

Overview

Publishes events to NATS JetStream with reliability features.

Publishes events to “env.app.sync.dest” subject pattern. Supports optional transactional outbox pattern for guaranteed delivery.

Examples:

Basic publishing

publisher = JetstreamBridge::Publisher.new
result = publisher.publish(
  resource_type: "user",
  event_type: "created",
  payload: { id: 1, email: "[email protected]" }
)
puts "Published: #{result.event_id}" if result.success?

Publishing with custom retry strategy

custom_strategy = MyRetryStrategy.new(max_attempts: 5)
publisher = JetstreamBridge::Publisher.new(retry_strategy: custom_strategy)
result = publisher.publish(event_type: "user.created", payload: { id: 1 })

Using convenience method

JetstreamBridge.publish(event_type: "user.created", payload: { id: 1 })

Instance Method Summary collapse

Constructor Details

#initialize(retry_strategy: nil) ⇒ Publisher

Initialize a new Publisher instance.

Note: The NATS connection should already be established via JetstreamBridge.startup! or automatically on first use. This assumes the connection is already established.

Parameters:

  • (defaults to: nil)

    Optional custom retry strategy for handling transient failures. Defaults to PublisherRetryStrategy with exponential backoff.

Raises:

  • If unable to get JetStream connection



45
46
47
48
49
50
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 45

def initialize(retry_strategy: nil)
  @jts = Connection.jetstream
  raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts

  @retry_strategy = retry_strategy || PublisherRetryStrategy.new
end

Instance Method Details

#do_publish(subject, envelope) ⇒ Models::PublishResult

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal publish method that routes to appropriate publish strategy.

Routes to outbox-based publishing if use_outbox is enabled, otherwise publishes directly to NATS with retry logic.

Parameters:

  • NATS subject to publish to

  • Complete event envelope

Returns:

  • Result object

API:

  • private



140
141
142
143
144
145
146
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 140

def do_publish(subject, envelope)
  if JetstreamBridge.config.use_outbox
    publish_via_outbox(subject, envelope)
  else
    with_retries { publish_to_nats(subject, envelope) }
  end
end

#publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options) ⇒ Models::PublishResult

Publishes an event to NATS JetStream.

Supports multiple usage patterns for flexibility:

  1. Structured parameters (recommended): publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })

  2. Hash/envelope with dot notation (auto-infers resource_type): publish(event_type: ‘user.created’, payload: …)

  3. Complete envelope (advanced): publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })

When use_outbox is enabled, events are persisted to database first for reliability. The event_id is used for deduplication via NATS message ID header.

Examples:

Structured parameters

result = publisher.publish(
  resource_type: "user",
  event_type: "created",
  payload: { id: 1, email: "[email protected]" }
)
puts "Published: #{result.event_id}" if result.success?

With options

result = publisher.publish(
  event_type: "user.created",
  payload: { id: 1 },
  event_id: "custom-id-123",
  trace_id: request_id,
  occurred_at: Time.now.utc
)

Error handling

result = publisher.publish(event_type: "order.created", payload: { id: 1 })
if result.failure?
  logger.error "Failed to publish: #{result.error.message}"
end

Parameters:

  • (defaults to: nil)

    Complete event envelope (if using pattern 3)

  • (defaults to: nil)

    Resource type (e.g., ‘user’, ‘order’). Required for pattern 1.

  • (defaults to: nil)

    Event type (e.g., ‘created’, ‘user.created’). Required for all patterns.

  • (defaults to: nil)

    Event payload data. Required for all patterns.

  • (defaults to: nil)

    Optional NATS subject override. Defaults to config.source_subject.

  • Additional options:

    • event_id [String] Custom event ID (auto-generated if not provided)

    • trace_id [String] Distributed trace ID

    • occurred_at [Time, String] Event timestamp (defaults to current time)

Returns:

  • Result object containing:

    • success [Boolean] Whether publish succeeded

    • event_id [String] The published event ID

    • subject [String] NATS subject used

    • error [Exception, nil] Error if publish failed

    • duplicate [Boolean] Whether NATS detected as duplicate

Raises:

  • If required parameters are missing or invalid



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 110

def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options)
  ensure_destination_app_configured!

  params = { event_or_hash: event_or_hash, resource_type: resource_type, event_type: event_type,
             payload: payload, subject: subject, options: options }
  envelope, resolved_subject = route_publish_params(params)

  do_publish(resolved_subject, envelope)
rescue ArgumentError
  # Re-raise validation errors for invalid parameters
  raise
rescue StandardError => e
  # Return failure result for publishing errors
  Models::PublishResult.new(
    success: false,
    event_id: envelope&.[]('event_id') || 'unknown',
    subject: resolved_subject || 'unknown',
    error: e
  )
end