Class: JetstreamBridge::InboxMessage

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/inbox/inbox_message.rb

Overview

Immutable value object for a single NATS message.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(msg, seq, deliveries, stream, subject, headers, body, raw, event_id, now, consumer = nil) ⇒ InboxMessage

Returns a new instance of InboxMessage.



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 36

def initialize(msg, seq, deliveries, stream, subject, headers, body, raw, event_id, now, consumer = nil)
  @msg        = msg
  @seq        = seq
  @deliveries = deliveries
  @stream     = stream
  @subject    = subject
  @headers    = headers
  @body       = body
  @raw        = raw
  @event_id   = event_id
  @now        = now
  @consumer   = consumer
end

Instance Attribute Details

#bodyObject (readonly)

Returns the value of attribute body.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def body
  @body
end

#deliveriesObject (readonly)

Returns the value of attribute deliveries.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def deliveries
  @deliveries
end

#event_idObject (readonly)

Returns the value of attribute event_id.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def event_id
  @event_id
end

#headersObject (readonly)

Returns the value of attribute headers.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def headers
  @headers
end

#msgObject (readonly)

Returns the value of attribute msg.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def msg
  @msg
end

#nowObject (readonly)

Returns the value of attribute now.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def now
  @now
end

#rawObject (readonly)

Returns the value of attribute raw.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def raw
  @raw
end

#seqObject (readonly)

Returns the value of attribute seq.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def seq
  @seq
end

#streamObject (readonly)

Returns the value of attribute stream.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def stream
  @stream
end

#subjectObject (readonly)

Returns the value of attribute subject.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 9

def subject
  @subject
end

Class Method Details

.from_nats(msg) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 11

def self.from_nats(msg)
  meta       = (msg.respond_to?(:metadata) && msg.) || nil
  seq        = meta.respond_to?(:stream_sequence) ? meta.stream_sequence : nil
  deliveries = meta.respond_to?(:num_delivered)   ? meta.num_delivered   : nil
  stream     = meta.respond_to?(:stream)          ? meta.stream          : nil
  consumer   = meta.respond_to?(:consumer)        ? meta.consumer        : nil
  subject    = msg.subject.to_s

  headers = {}
  (msg.header || {}).each { |k, v| headers[k.to_s.downcase] = v }

  raw  = msg.data
  body = begin
    Oj.load(raw, mode: :strict)
  rescue Oj::Error
    {}
  end

  id = (headers['nats-msg-id'] || body['event_id']).to_s.strip
  id = "seq:#{seq}" if id.empty? && seq
  id = SecureRandom.uuid if id.to_s.empty?

  new(msg, seq, deliveries, stream, subject, headers, body, raw, id, Time.now.utc, consumer)
end

Instance Method Details

#ackObject



67
68
69
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 67

def ack(*, **)
  msg.ack(*, **) if msg.respond_to?(:ack)
end

#body_for_storeObject



50
51
52
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 50

def body_for_store
  body.empty? ? raw : body
end

#dataObject



54
55
56
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 54

def data
  raw
end

#headerObject



58
59
60
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 58

def header
  headers
end

#metadataObject



62
63
64
65
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 62

def 
  @metadata ||= Struct.new(:num_delivered, :sequence, :consumer, :stream)
                      .new(deliveries, seq, @consumer, stream)
end

#nakObject



71
72
73
# File 'lib/jetstream_bridge/consumer/inbox/inbox_message.rb', line 71

def nak(*, **)
  msg.nak(*, **) if msg.respond_to?(:nak)
end