Class: PubsubClient::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/pubsub_client/subscriber.rb

Constant Summary collapse

DEFAULT_CONCURRENCY =
8

Instance Method Summary collapse

Constructor Details

#initialize(subscription) ⇒ Subscriber

Returns a new instance of Subscriber.

Parameters:

  • subscription (Google::Cloud::PubSub::Subscription)


10
11
12
# File 'lib/pubsub_client/subscriber.rb', line 10

def initialize(subscription)
  @subscription = subscription
end

Instance Method Details

#listener(concurrency: DEFAULT_CONCURRENCY, auto_ack: true, &block) ⇒ Google::Cloud::PubSub::Subscriber

to remove it from the topic. Default is ‘true`.

Parameters:

  • concurrency (Integer) (defaults to: DEFAULT_CONCURRENCY)
    • The number of threads to run the subscriber with. Default is 8.

  • auto_ack (Boolean) (defaults to: true)
    • Flag to acknowledge the Pub/Sub message. A message must be acked

Returns:

  • (Google::Cloud::PubSub::Subscriber)


19
20
21
22
23
24
25
26
# File 'lib/pubsub_client/subscriber.rb', line 19

def listener(concurrency: DEFAULT_CONCURRENCY, auto_ack: true, &block)
  @listener ||= begin
    @subscription.listen(threads: { callback: concurrency }) do |received_message|
      yield received_message.data, received_message
      received_message.acknowledge! if auto_ack
    end
  end
end

#on_error(&block) ⇒ Object

Raises:



40
41
42
43
44
45
46
# File 'lib/pubsub_client/subscriber.rb', line 40

def on_error(&block)
  raise ConfigurationError, 'A listener must be configured' unless @listener

  @listener.on_error do |exception|
    yield exception
  end
end

#subscribeObject

Raises:



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pubsub_client/subscriber.rb', line 28

def subscribe
  raise ConfigurationError, 'A listener must be configured' unless @listener

  begin
    @listener.start

    sleep
  rescue SignalException
    @listener.stop.wait!
  end
end