Class: LocalBus::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/local_bus/message.rb

Overview

Represents a message in the LocalBus system

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, timeout: nil, **payload) ⇒ Message

Note:

Creates a new Message instance with the given topic and payload

Constructor



11
12
13
14
15
16
17
18
19
20
# File 'lib/local_bus/message.rb', line 11

def initialize(topic, timeout: nil, **payload)
  @metadata ||= {
    id: SecureRandom.uuid_v7,
    topic: topic.to_s.freeze,
    payload: payload.transform_keys(&:to_sym).freeze,
    created_at: Time.now,
    thread_id: Thread.current.object_id,
    timeout: timeout.to_f
  }.freeze
end

Instance Attribute Details

#metadataObject (readonly) Also known as: to_h

Metadata for the message



24
25
26
# File 'lib/local_bus/message.rb', line 24

def 
  @metadata
end

#publicationObject

Note:

May be nil if processing hasn’t happened yet (e.g. it was published via Station)

Publication representing the Async barrier and subscribers handling the message



29
30
31
# File 'lib/local_bus/message.rb', line 29

def publication
  @publication
end

Instance Method Details

#created_atObject

Time when the message was created or published



51
52
53
# File 'lib/local_bus/message.rb', line 51

def created_at
  [:created_at]
end

#deconstruct_keys(keys) ⇒ Object

Allows pattern matching on message attributes



96
97
98
# File 'lib/local_bus/message.rb', line 96

def deconstruct_keys(keys)
  keys.any? ? to_h.slice(*keys) : to_h
end

#idObject

Unique identifier for the message



33
34
35
# File 'lib/local_bus/message.rb', line 33

def id
  [:id]
end

#payloadObject

Message payload



45
46
47
# File 'lib/local_bus/message.rb', line 45

def payload
  [:payload]
end

#subscribersObject

Blocks and waits for the message process then returns all subscribers



84
85
86
87
# File 'lib/local_bus/message.rb', line 84

def subscribers
  wait
  publication.subscribers
end

#thread_idObject

ID of the thread that created the message



57
58
59
# File 'lib/local_bus/message.rb', line 57

def thread_id
  [:thread_id]
end

#timeoutObject

Timeout for message processing (in seconds)



63
64
65
# File 'lib/local_bus/message.rb', line 63

def timeout
  [:timeout]
end

#topicObject

Message topic



39
40
41
# File 'lib/local_bus/message.rb', line 39

def topic
  [:topic]
end

#wait(interval: 0.1) ⇒ Object

Blocks and waits for the message to process



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/local_bus/message.rb', line 70

def wait(interval: 0.1)
  @timers ||= Timers::Group.new.tap { _1.every(interval) {} }
  loop do
    break if publication
    @timers.wait
  end
  publication&.wait
ensure
  @timers&.cancel
  @timers = nil
end