Module: JetstreamBridge::TestHelpers::IntegrationHelpers

Defined in:
lib/jetstream_bridge/test_helpers/integration_helpers.rb

Overview

Integration helpers to exercise the mock NATS storage end-to-end.

Instance Method Summary collapse

Instance Method Details

#consume_events(batch_size: 10) {|event| ... } ⇒ Array<Hash>

Consume events from mock storage

Yields:

  • (event)

    Block to handle each event



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/jetstream_bridge/test_helpers/integration_helpers.rb', line 32

def consume_events(batch_size: 10, &handler)
  storage = JetstreamBridge::TestHelpers.mock_storage
  messages = storage.messages.first(batch_size)

  messages.each do |msg|
    event = JetstreamBridge::Models::Event.from_nats_message(
      OpenStruct.new(
        subject: msg[:subject],
        data: msg[:data],
        header: msg[:header],
        metadata: OpenStruct.new(
          sequence: OpenStruct.new(stream: msg[:sequence]),
          num_delivered: msg[:delivery_count],
          stream: 'test-stream',
          consumer: 'test-consumer'
        )
      )
    )

    handler&.call(event)
    JetstreamBridge::TestHelpers.record_consumed_event(event.to_h)
  end

  JetstreamBridge::TestHelpers.consumed_events
end

#publish_and_wait(timeout: 1, **event_attrs) ⇒ Models::PublishResult

Publish an event and wait for it to appear in mock storage

Raises:

  • (Timeout::Error)

    If event doesn’t appear within timeout



13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/jetstream_bridge/test_helpers/integration_helpers.rb', line 13

def publish_and_wait(timeout: 1, **event_attrs)
  result = JetstreamBridge.publish(**event_attrs)

  deadline = Time.now + timeout
  until Time.now > deadline
    storage = JetstreamBridge::TestHelpers.mock_storage
    break if storage.messages.any? { |m| m[:header]['nats-msg-id'] == result.event_id }

    sleep 0.01
  end

  result
end

#wait_for_messages(count, timeout: 2) ⇒ Boolean

Wait for a specific number of messages in mock storage



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/jetstream_bridge/test_helpers/integration_helpers.rb', line 63

def wait_for_messages(count, timeout: 2)
  deadline = Time.now + timeout
  storage = JetstreamBridge::TestHelpers.mock_storage

  until Time.now > deadline
    return true if storage.messages.size >= count

    sleep 0.01
  end

  false
end