Class: JetstreamBridge::InboxMessage
- Inherits:
-
Object
- Object
- JetstreamBridge::InboxMessage
- Defined in:
- lib/jetstream_bridge/consumer/inbox/inbox_message.rb
Overview
Immutable value object for a single NATS message.
Instance Attribute Summary collapse
-
#body ⇒ Object
readonly
Returns the value of attribute body.
-
#deliveries ⇒ Object
readonly
Returns the value of attribute deliveries.
-
#event_id ⇒ Object
readonly
Returns the value of attribute event_id.
-
#headers ⇒ Object
readonly
Returns the value of attribute headers.
-
#msg ⇒ Object
readonly
Returns the value of attribute msg.
-
#now ⇒ Object
readonly
Returns the value of attribute now.
-
#raw ⇒ Object
readonly
Returns the value of attribute raw.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
-
#subject ⇒ Object
readonly
Returns the value of attribute subject.
Class Method Summary collapse
Instance Method Summary collapse
- #ack ⇒ Object
- #body_for_store ⇒ Object
- #data ⇒ Object
- #header ⇒ Object
-
#initialize(msg, seq, deliveries, stream, subject, headers, body, raw, event_id, now, consumer = nil) ⇒ InboxMessage
constructor
A new instance of InboxMessage.
- #metadata ⇒ Object
- #nak ⇒ Object
Constructor Details
#initialize(msg, seq, deliveries, stream, subject, headers, body, raw, event_id, now, consumer = nil) ⇒ InboxMessage
Returns a new instance of InboxMessage.
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 36 def initialize(msg, seq, deliveries, stream, subject, headers, body, raw, event_id, now, consumer = nil) @msg = msg @seq = seq @deliveries = deliveries @stream = stream @subject = subject @headers = headers @body = body @raw = raw @event_id = event_id @now = now @consumer = consumer end |
Instance Attribute Details
#body ⇒ Object (readonly)
Returns the value of attribute body.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def body @body end |
#deliveries ⇒ Object (readonly)
Returns the value of attribute deliveries.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def deliveries @deliveries end |
#event_id ⇒ Object (readonly)
Returns the value of attribute event_id.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def event_id @event_id end |
#headers ⇒ Object (readonly)
Returns the value of attribute headers.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def headers @headers end |
#msg ⇒ Object (readonly)
Returns the value of attribute msg.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def msg @msg end |
#now ⇒ Object (readonly)
Returns the value of attribute now.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def now @now end |
#raw ⇒ Object (readonly)
Returns the value of attribute raw.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def raw @raw end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def seq @seq end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def stream @stream end |
#subject ⇒ Object (readonly)
Returns the value of attribute subject.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9 def subject @subject end |
Class Method Details
.from_nats(msg) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 11 def self.from_nats(msg) = (msg.respond_to?(:metadata) && msg.) || nil seq = .respond_to?(:stream_sequence) ? .stream_sequence : nil deliveries = .respond_to?(:num_delivered) ? .num_delivered : nil stream = .respond_to?(:stream) ? .stream : nil consumer = .respond_to?(:consumer) ? .consumer : nil subject = msg.subject.to_s headers = {} (msg.header || {}).each { |k, v| headers[k.to_s.downcase] = v } raw = msg.data body = begin Oj.load(raw, mode: :strict) rescue Oj::Error {} end id = (headers['nats-msg-id'] || body['event_id']).to_s.strip id = "seq:#{seq}" if id.empty? && seq id = SecureRandom.uuid if id.to_s.empty? new(msg, seq, deliveries, stream, subject, headers, body, raw, id, Time.now.utc, consumer) end |
Instance Method Details
#ack ⇒ Object
67 68 69 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 67 def ack(*, **) msg.ack(*, **) if msg.respond_to?(:ack) end |
#body_for_store ⇒ Object
50 51 52 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 50 def body_for_store body.empty? ? raw : body end |
#data ⇒ Object
54 55 56 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 54 def data raw end |
#header ⇒ Object
58 59 60 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 58 def header headers end |
#metadata ⇒ Object
62 63 64 65 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 62 def @metadata ||= Struct.new(:num_delivered, :sequence, :consumer, :stream) .new(deliveries, seq, @consumer, stream) end |
#nak ⇒ Object
71 72 73 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 71 def nak(*, **) msg.nak(*, **) if msg.respond_to?(:nak) end |