Class: JetstreamBridge::Publisher
- Inherits:
-
Object
- Object
- JetstreamBridge::Publisher
- Defined in:
- lib/jetstream_bridge/publisher/publisher.rb
Overview
Publishes events to NATS JetStream with reliability features.
Publishes events to “env.app.sync.dest” subject pattern. Supports optional transactional outbox pattern for guaranteed delivery.
Instance Method Summary collapse
-
#do_publish(subject, envelope) ⇒ Models::PublishResult
private
Internal publish method that routes to appropriate publish strategy.
-
#initialize(retry_strategy: nil) ⇒ Publisher
constructor
Initialize a new Publisher instance.
-
#publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options) ⇒ Models::PublishResult
Publishes an event to NATS JetStream.
Constructor Details
#initialize(retry_strategy: nil) ⇒ Publisher
Initialize a new Publisher instance.
Note: The NATS connection should already be established via JetstreamBridge.startup! or automatically on first use. This assumes the connection is already established.
45 46 47 48 49 50 |
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 45 def initialize(retry_strategy: nil) @jts = Connection.jetstream raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts @retry_strategy = retry_strategy || PublisherRetryStrategy.new end |
Instance Method Details
#do_publish(subject, envelope) ⇒ Models::PublishResult
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Internal publish method that routes to appropriate publish strategy.
Routes to outbox-based publishing if use_outbox is enabled, otherwise publishes directly to NATS with retry logic.
140 141 142 143 144 145 146 |
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 140 def do_publish(subject, envelope) if JetstreamBridge.config.use_outbox publish_via_outbox(subject, envelope) else with_retries { publish_to_nats(subject, envelope) } end end |
#publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options) ⇒ Models::PublishResult
Publishes an event to NATS JetStream.
Supports multiple usage patterns for flexibility:
-
Structured parameters (recommended): publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })
-
Hash/envelope with dot notation (auto-infers resource_type): publish(event_type: ‘user.created’, payload: …)
-
Complete envelope (advanced): publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })
When use_outbox is enabled, events are persisted to database first for reliability. The event_id is used for deduplication via NATS message ID header.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/jetstream_bridge/publisher/publisher.rb', line 110 def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **) ensure_destination_app_configured! params = { event_or_hash: event_or_hash, resource_type: resource_type, event_type: event_type, payload: payload, subject: subject, options: } envelope, resolved_subject = route_publish_params(params) do_publish(resolved_subject, envelope) rescue ArgumentError # Re-raise validation errors for invalid parameters raise rescue StandardError => e # Return failure result for publishing errors Models::PublishResult.new( success: false, event_id: envelope&.[]('event_id') || 'unknown', subject: resolved_subject || 'unknown', error: e ) end |