Class: StagedEvent::GooglePubSub::Subscriber

Inherits:
Subscriber
  • Object
show all
Includes:
Technologic
Defined in:
lib/staged_event/google_pub_sub/subscriber.rb

Instance Method Summary collapse

Constructor Details

#initializeSubscriber

Returns a new instance of Subscriber.

Raises:

  • (ArgumentError)


8
9
10
11
12
# File 'lib/staged_event/google_pub_sub/subscriber.rb', line 8

def initialize
  @google_pubsub = Helper.new_google_pubsub

  raise ArgumentError, "event_received_callback is undefined" unless event_received_callback.respond_to?(:call)
end

Instance Method Details

#receive_eventsObject



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/staged_event/google_pub_sub/subscriber.rb', line 14

def receive_events
  threads = []

  subscription_ids.each do |subscription_id|
    threads << Thread.new do
      receive_events_from_subscription(subscription_id)
    end
  end

  threads.each(&:join)
end

#receive_events_from_subscription(subscription_id) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/staged_event/google_pub_sub/subscriber.rb', line 26

def receive_events_from_subscription(subscription_id)
  subscription = google_pubsub.subscription(subscription_id)

  loop do
    received_messages = subscription.pull(immediate: false)
    received_messages.each do |received_message|
      event_received_callback.call(received_message.data)
      received_message.acknowledge!
    end
  end
rescue StandardError => exception
  error :subscription_failed, exception: exception.message
  retry
end