Class: JetstreamBridge::TestHelpers::MockNats::InMemoryStorage
- Inherits:
-
Object
- Object
- JetstreamBridge::TestHelpers::MockNats::InMemoryStorage
- Defined in:
- lib/jetstream_bridge/test_helpers/mock_nats.rb
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Instance Method Summary collapse
- #ack_message(message_ref) ⇒ Object
- #add_consumer(stream_name, config) ⇒ Object
- #add_stream(config) ⇒ Object
- #create_subscription(subject, durable_name, options) ⇒ Object
- #delete_consumer(stream_name, consumer_name) ⇒ Object
- #delete_stream(name) ⇒ Object
- #fetch_messages(subject, durable_name, batch_size, _timeout) ⇒ Object
- #find_consumer(stream_name, durable_name) ⇒ Object
- #find_stream(name) ⇒ Object
- #find_stream_for_subject(_subject) ⇒ Object
-
#initialize ⇒ InMemoryStorage
constructor
A new instance of InMemoryStorage.
- #nak_message(_message_ref, delay: nil) ⇒ Object
- #publish(subject, data, header) ⇒ Object
- #reset! ⇒ Object
- #term_message(message_ref) ⇒ Object
Constructor Details
#initialize ⇒ InMemoryStorage
177 178 179 180 181 182 183 184 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 177 def initialize @streams = {} @consumers = {} @messages = [] @subscriptions = {} @sequence_counter = 0 @mutex = Mutex.new end |
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
175 176 177 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175 def consumers @consumers end |
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
175 176 177 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175 def @messages end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
175 176 177 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175 def streams @streams end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
175 176 177 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175 def subscriptions @subscriptions end |
Instance Method Details
#ack_message(message_ref) ⇒ Object
270 271 272 273 274 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 270 def () @mutex.synchronize do @messages.delete() end end |
#add_consumer(stream_name, config) ⇒ Object
319 320 321 322 323 324 325 326 327 328 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 319 def add_consumer(stream_name, config) @mutex.synchronize do durable_name = config[:durable_name] || config['durable_name'] @consumers[durable_name] = MockConsumer.new( name: durable_name, stream: stream_name, config: config ) end end |
#add_stream(config) ⇒ Object
290 291 292 293 294 295 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 290 def add_stream(config) @mutex.synchronize do name = config[:name] || config['name'] @streams[name] = MockStream.new(name, config) end end |
#create_subscription(subject, durable_name, options) ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 213 def create_subscription(subject, durable_name, ) @mutex.synchronize do stream_name = [:stream] || find_stream_for_subject(subject)&.name || 'mock-stream' subscription = MockSubscription.new( subject: subject, durable_name: durable_name, storage: self, stream_name: stream_name, options: ) @subscriptions[durable_name] = subscription # Register consumer @consumers[durable_name] = MockConsumer.new( name: durable_name, stream: stream_name, config: ) subscription end end |
#delete_consumer(stream_name, consumer_name) ⇒ Object
330 331 332 333 334 335 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 330 def delete_consumer(stream_name, consumer_name) @mutex.synchronize do consumer = @consumers[consumer_name] @consumers.delete(consumer_name) if consumer&.stream == stream_name end end |
#delete_stream(name) ⇒ Object
297 298 299 300 301 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 297 def delete_stream(name) @mutex.synchronize do @streams.delete(name) end end |
#fetch_messages(subject, durable_name, batch_size, _timeout) ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 238 def (subject, durable_name, batch_size, _timeout) @mutex.synchronize do consumer = @consumers[durable_name] stream_name = consumer&.stream || 'mock-stream' # Find messages matching the subject matching = @messages.select do |msg| msg[:subject] == subject && msg[:delivery_count] < max_deliver_for(durable_name) end # Take up to batch_size messages to_deliver = matching.first(batch_size) # Increment delivery count and return MockMessage objects to_deliver.map do |msg| msg[:delivery_count] += 1 MockMessage.new( subject: msg[:subject], data: msg[:data], header: msg[:header], sequence: msg[:sequence], stream: stream_name, consumer: durable_name, num_delivered: msg[:delivery_count], storage: self, message_ref: msg ) end end end |
#find_consumer(stream_name, durable_name) ⇒ Object
311 312 313 314 315 316 317 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 311 def find_consumer(stream_name, durable_name) consumer = @consumers[durable_name] return nil unless consumer return nil unless consumer.stream == stream_name consumer end |
#find_stream(name) ⇒ Object
303 304 305 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 303 def find_stream(name) @streams[name] end |
#find_stream_for_subject(_subject) ⇒ Object
307 308 309 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 307 def find_stream_for_subject(_subject) @streams.values.first # Simplified: return first stream end |
#nak_message(_message_ref, delay: nil) ⇒ Object
276 277 278 279 280 281 282 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 276 def (, delay: nil) @mutex.synchronize do # Message stays in queue for redelivery # Note: delivery_count was already incremented during fetch # We don't decrement it here as it represents actual delivery attempts end end |
#publish(subject, data, header) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 186 def publish(subject, data, header) @mutex.synchronize do event_id = header['nats-msg-id'] || SecureRandom.uuid # Check for duplicate duplicate = @messages.any? { |msg| msg[:header]['nats-msg-id'] == event_id } unless duplicate @sequence_counter += 1 @messages << { subject: subject, data: data, header: header, sequence: @sequence_counter, timestamp: Time.now, delivery_count: 0 } end MockAck.new( duplicate: duplicate, sequence: @sequence_counter, stream: find_stream_for_subject(subject)&.name || 'mock-stream' ) end end |
#reset! ⇒ Object
337 338 339 340 341 342 343 344 345 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 337 def reset! @mutex.synchronize do @streams.clear @consumers.clear @messages.clear @subscriptions.clear @sequence_counter = 0 end end |
#term_message(message_ref) ⇒ Object
284 285 286 287 288 |
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 284 def () @mutex.synchronize do @messages.delete() end end |