Class: PubsubClient::PublisherFactory

Inherits:
Object
  • Object
show all
Defined in:
lib/pubsub_client/publisher_factory.rb

Overview

Build and memoize the Publisher, accounting for GRPC’s requirements around forking.

Defined Under Namespace

Classes: Memo

Instance Method Summary collapse

Constructor Details

#initializePublisherFactory

Returns a new instance of PublisherFactory.



8
9
10
11
# File 'lib/pubsub_client/publisher_factory.rb', line 8

def initialize
  @mutex = Mutex.new
  @publishers = {}
end

Instance Method Details

#build(topic_name) ⇒ Publisher

Parameters:

  • topic_name (String)

Returns:



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pubsub_client/publisher_factory.rb', line 15

def build(topic_name)
  # GRPC fails when attempting to use a connection created in a process that gets
  # forked with the message
  #
  #  "grpc cannot be used before and after forking"
  #
  # Also creating a new publsher incurs significant overhead as it connects to
  # PubSub.
  #
  # To prevent incurring overhead, memoize the publisher per process.
  return publishers[topic_name].publisher if publishers[topic_name]&.pid == current_pid

  # We are in a multi-threaded world and need to be careful not to build the publisher
  # in multiple threads. Lock the mutex so that only one thread can enter this block
  # at a time.
  @mutex.synchronize do
    # It's possible two threads made it to this point, but since we have a lock we
    # know that one will have built the publisher before the second is able to enter.
    # If we detect that case, then bail out so as to not rebuild the publisher.
    unless publishers[topic_name]&.pid == current_pid
      publishers[topic_name] = Memo.new(build_publisher(topic_name), Process.pid)
    end
  end

  publishers[topic_name].publisher
end