Class: LocalBus::Message
- Inherits:
-
Object
- Object
- LocalBus::Message
- Defined in:
- lib/local_bus/message.rb
Overview
Represents a message in the LocalBus system
Instance Attribute Summary collapse
-
#metadata ⇒ Object
(also: #to_h)
readonly
Metadata for the message.
-
#publication ⇒ Object
Publication representing the Async barrier and subscribers handling the message.
Instance Method Summary collapse
-
#created_at ⇒ Object
Time when the message was created or published.
-
#deconstruct_keys(keys) ⇒ Object
Allows pattern matching on message attributes.
-
#id ⇒ Object
Unique identifier for the message.
-
#initialize(topic, timeout: nil, **payload) ⇒ Message
constructor
Constructor.
-
#payload ⇒ Object
Message payload.
-
#subscribers ⇒ Object
Blocks and waits for the message process then returns all subscribers.
-
#thread_id ⇒ Object
ID of the thread that created the message.
-
#timeout ⇒ Object
Timeout for message processing (in seconds).
-
#topic ⇒ Object
Message topic.
-
#wait(interval: 0.1) ⇒ Object
Blocks and waits for the message to process.
Constructor Details
#initialize(topic, timeout: nil, **payload) ⇒ Message
Creates a new Message instance with the given topic and payload
Constructor
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/local_bus/message.rb', line 11 def initialize(topic, timeout: nil, **payload) @metadata ||= { id: SecureRandom.uuid_v7, topic: topic.to_s.freeze, payload: payload.transform_keys(&:to_sym).freeze, created_at: Time.now, thread_id: Thread.current.object_id, timeout: timeout.to_f }.freeze end |
Instance Attribute Details
#metadata ⇒ Object (readonly) Also known as: to_h
Metadata for the message
24 25 26 |
# File 'lib/local_bus/message.rb', line 24 def @metadata end |
#publication ⇒ Object
May be nil if processing hasn’t happened yet (e.g. it was published via Station)
Publication representing the Async barrier and subscribers handling the message
29 30 31 |
# File 'lib/local_bus/message.rb', line 29 def publication @publication end |
Instance Method Details
#created_at ⇒ Object
Time when the message was created or published
51 52 53 |
# File 'lib/local_bus/message.rb', line 51 def created_at [:created_at] end |
#deconstruct_keys(keys) ⇒ Object
Allows pattern matching on message attributes
96 97 98 |
# File 'lib/local_bus/message.rb', line 96 def deconstruct_keys(keys) keys.any? ? to_h.slice(*keys) : to_h end |
#id ⇒ Object
Unique identifier for the message
33 34 35 |
# File 'lib/local_bus/message.rb', line 33 def id [:id] end |
#payload ⇒ Object
Message payload
45 46 47 |
# File 'lib/local_bus/message.rb', line 45 def payload [:payload] end |
#subscribers ⇒ Object
Blocks and waits for the message process then returns all subscribers
84 85 86 87 |
# File 'lib/local_bus/message.rb', line 84 def subscribers wait publication.subscribers end |
#thread_id ⇒ Object
ID of the thread that created the message
57 58 59 |
# File 'lib/local_bus/message.rb', line 57 def thread_id [:thread_id] end |
#timeout ⇒ Object
Timeout for message processing (in seconds)
63 64 65 |
# File 'lib/local_bus/message.rb', line 63 def timeout [:timeout] end |
#topic ⇒ Object
Message topic
39 40 41 |
# File 'lib/local_bus/message.rb', line 39 def topic [:topic] end |
#wait(interval: 0.1) ⇒ Object
Blocks and waits for the message to process
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/local_bus/message.rb', line 70 def wait(interval: 0.1) @timers ||= Timers::Group.new.tap { _1.every(interval) {} } loop do break if publication @timers.wait end publication&.wait ensure @timers&.cancel @timers = nil end |