Class: JetstreamBridge::TestHelpers::MockNats::InMemoryStorage

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/test_helpers/mock_nats.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeInMemoryStorage



177
178
179
180
181
182
183
184
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 177

def initialize
  @streams = {}
  @consumers = {}
  @messages = []
  @subscriptions = {}
  @sequence_counter = 0
  @mutex = Mutex.new
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.



175
176
177
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175

def consumers
  @consumers
end

#messagesObject (readonly)

Returns the value of attribute messages.



175
176
177
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175

def messages
  @messages
end

#streamsObject (readonly)

Returns the value of attribute streams.



175
176
177
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175

def streams
  @streams
end

#subscriptionsObject (readonly)

Returns the value of attribute subscriptions.



175
176
177
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 175

def subscriptions
  @subscriptions
end

Instance Method Details

#ack_message(message_ref) ⇒ Object



270
271
272
273
274
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 270

def ack_message(message_ref)
  @mutex.synchronize do
    @messages.delete(message_ref)
  end
end

#add_consumer(stream_name, config) ⇒ Object



319
320
321
322
323
324
325
326
327
328
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 319

def add_consumer(stream_name, config)
  @mutex.synchronize do
    durable_name = config[:durable_name] || config['durable_name']
    @consumers[durable_name] = MockConsumer.new(
      name: durable_name,
      stream: stream_name,
      config: config
    )
  end
end

#add_stream(config) ⇒ Object



290
291
292
293
294
295
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 290

def add_stream(config)
  @mutex.synchronize do
    name = config[:name] || config['name']
    @streams[name] = MockStream.new(name, config)
  end
end

#create_subscription(subject, durable_name, options) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 213

def create_subscription(subject, durable_name, options)
  @mutex.synchronize do
    stream_name = options[:stream] || find_stream_for_subject(subject)&.name || 'mock-stream'

    subscription = MockSubscription.new(
      subject: subject,
      durable_name: durable_name,
      storage: self,
      stream_name: stream_name,
      options: options
    )

    @subscriptions[durable_name] = subscription

    # Register consumer
    @consumers[durable_name] = MockConsumer.new(
      name: durable_name,
      stream: stream_name,
      config: options
    )

    subscription
  end
end

#delete_consumer(stream_name, consumer_name) ⇒ Object



330
331
332
333
334
335
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 330

def delete_consumer(stream_name, consumer_name)
  @mutex.synchronize do
    consumer = @consumers[consumer_name]
    @consumers.delete(consumer_name) if consumer&.stream == stream_name
  end
end

#delete_stream(name) ⇒ Object



297
298
299
300
301
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 297

def delete_stream(name)
  @mutex.synchronize do
    @streams.delete(name)
  end
end

#fetch_messages(subject, durable_name, batch_size, _timeout) ⇒ Object



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 238

def fetch_messages(subject, durable_name, batch_size, _timeout)
  @mutex.synchronize do
    consumer = @consumers[durable_name]
    stream_name = consumer&.stream || 'mock-stream'

    # Find messages matching the subject
    matching = @messages.select do |msg|
      msg[:subject] == subject && msg[:delivery_count] < max_deliver_for(durable_name)
    end

    # Take up to batch_size messages
    to_deliver = matching.first(batch_size)

    # Increment delivery count and return MockMessage objects
    to_deliver.map do |msg|
      msg[:delivery_count] += 1

      MockMessage.new(
        subject: msg[:subject],
        data: msg[:data],
        header: msg[:header],
        sequence: msg[:sequence],
        stream: stream_name,
        consumer: durable_name,
        num_delivered: msg[:delivery_count],
        storage: self,
        message_ref: msg
      )
    end
  end
end

#find_consumer(stream_name, durable_name) ⇒ Object



311
312
313
314
315
316
317
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 311

def find_consumer(stream_name, durable_name)
  consumer = @consumers[durable_name]
  return nil unless consumer
  return nil unless consumer.stream == stream_name

  consumer
end

#find_stream(name) ⇒ Object



303
304
305
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 303

def find_stream(name)
  @streams[name]
end

#find_stream_for_subject(_subject) ⇒ Object



307
308
309
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 307

def find_stream_for_subject(_subject)
  @streams.values.first # Simplified: return first stream
end

#nak_message(_message_ref, delay: nil) ⇒ Object



276
277
278
279
280
281
282
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 276

def nak_message(_message_ref, delay: nil)
  @mutex.synchronize do
    # Message stays in queue for redelivery
    # Note: delivery_count was already incremented during fetch
    # We don't decrement it here as it represents actual delivery attempts
  end
end

#publish(subject, data, header) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 186

def publish(subject, data, header)
  @mutex.synchronize do
    event_id = header['nats-msg-id'] || SecureRandom.uuid

    # Check for duplicate
    duplicate = @messages.any? { |msg| msg[:header]['nats-msg-id'] == event_id }

    unless duplicate
      @sequence_counter += 1
      @messages << {
        subject: subject,
        data: data,
        header: header,
        sequence: @sequence_counter,
        timestamp: Time.now,
        delivery_count: 0
      }
    end

    MockAck.new(
      duplicate: duplicate,
      sequence: @sequence_counter,
      stream: find_stream_for_subject(subject)&.name || 'mock-stream'
    )
  end
end

#reset!Object



337
338
339
340
341
342
343
344
345
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 337

def reset!
  @mutex.synchronize do
    @streams.clear
    @consumers.clear
    @messages.clear
    @subscriptions.clear
    @sequence_counter = 0
  end
end

#term_message(message_ref) ⇒ Object



284
285
286
287
288
# File 'lib/jetstream_bridge/test_helpers/mock_nats.rb', line 284

def term_message(message_ref)
  @mutex.synchronize do
    @messages.delete(message_ref)
  end
end