Class: WaterDrop::Producer

Inherits:
Object
  • Object
show all
Includes:
Async, Buffer, Sync
Defined in:
lib/water_drop/producer.rb,
lib/water_drop/producer/sync.rb,
lib/water_drop/producer/async.rb,
lib/water_drop/producer/buffer.rb,
lib/water_drop/producer/status.rb,
lib/water_drop/producer/builder.rb,
lib/water_drop/producer/dummy_client.rb,
lib/water_drop/producer/statistics_decorator.rb

Overview

Main WaterDrop messages producer

Defined Under Namespace

Modules: Async, Buffer, Sync Classes: Builder, DummyClient, StatisticsDecorator, Status

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Buffer

#buffer, #buffer_many, #flush_async, #flush_sync

Methods included from Async

#produce_async, #produce_many_async

Methods included from Sync

#produce_many_sync, #produce_sync

Constructor Details

#initialize(&block) ⇒ Producer

Creates a not-yet-configured instance of the producer

Parameters:

  • block (Proc)

    configuration block


24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/water_drop/producer.rb', line 24

def initialize(&block)
  @buffer_mutex = Mutex.new
  @connecting_mutex = Mutex.new
  @closing_mutex = Mutex.new

  @status = Status.new
  @messages = Concurrent::Array.new

  return unless block

  setup(&block)
end

Instance Attribute Details

#configObject (readonly)

Returns dry-configurable config object.

Returns:

  • (Object)

    dry-configurable config object


19
20
21
# File 'lib/water_drop/producer.rb', line 19

def config
  @config
end

#idString (readonly)

Returns uuid of the current producer.

Returns:

  • (String)

    uuid of the current producer


11
12
13
# File 'lib/water_drop/producer.rb', line 11

def id
  @id
end

#messagesConcurrent::Array (readonly)

Returns internal messages buffer.

Returns:

  • (Concurrent::Array)

    internal messages buffer


15
16
17
# File 'lib/water_drop/producer.rb', line 15

def messages
  @messages
end

#monitorObject (readonly)

Returns monitor we want to use.

Returns:

  • (Object)

    monitor we want to use


17
18
19
# File 'lib/water_drop/producer.rb', line 17

def monitor
  @monitor
end

#statusStatus (readonly)

Returns producer status object.

Returns:

  • (Status)

    producer status object


13
14
15
# File 'lib/water_drop/producer.rb', line 13

def status
  @status
end

Instance Method Details

#clientRdkafka::Producer

Note:

Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.

Note:

It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it's much better to fork configured but not used producers

Returns raw rdkafka producer.

Returns:

  • (Rdkafka::Producer)

    raw rdkafka producer

Raises:


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/water_drop/producer.rb', line 58

def client
  return @client if @client && @pid == Process.pid

  # Don't allow to obtain a client reference for a producer that was not configured
  raise Errors::ProducerNotConfiguredError, id if @status.initial?

  @connecting_mutex.synchronize do
    return @client if @client && @pid == Process.pid

    # We should raise an error when trying to use a producer from a fork, that is already
    # connected to Kafka. We allow forking producers only before they are used
    raise Errors::ProducerUsedInParentProcess, Process.pid if @status.connected?

    # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent
    # process don't leak
    ObjectSpace.undefine_finalizer(self)
    # Finalizer tracking is needed for handling shutdowns gracefully.
    # I don't expect everyone to remember about closing all the producers all the time, thus
    # this approach is better. Although it is still worth keeping in mind, that this will
    # block GC from removing a no longer used producer unless closed properly
    ObjectSpace.define_finalizer(self, proc { close })

    @pid = Process.pid
    @client = Builder.new.call(self, @config)
    @status.connected!
  end

  @client
end

#closeObject

Flushes the buffers in a sync way and closes the producer


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/water_drop/producer.rb', line 89

def close
  @closing_mutex.synchronize do
    return unless @status.active?

    @monitor.instrument(
      'producer.closed',
      producer: self
    ) do
      @status.closing!

      # No need for auto-gc if everything got closed by us
      # This should be used only in case a producer was not closed properly and forgotten
      ObjectSpace.undefine_finalizer(self)

      # Flush has it's own buffer mutex but even if it is blocked, flushing can still happen
      # as we close the client after the flushing (even if blocked by the mutex)
      flush(false)

      # We should not close the client in several threads the same time
      # It is safe to run it several times but not exactly the same moment
      client.close

      @status.closed!
    end
  end
end

#ensure_active!Object

Ensures that we don't run any operations when the producer is not configured or when it was already closed


118
119
120
121
122
123
124
125
126
# File 'lib/water_drop/producer.rb', line 118

def ensure_active!
  return if @status.active?

  raise Errors::ProducerNotConfiguredError, id if @status.initial?
  raise Errors::ProducerClosedError, id if @status.closing? || @status.closed?

  # This should never happen
  raise Errors::StatusInvalidError, [id, @status.to_s]
end

#setup(&block) ⇒ Object

Sets up the whole configuration and initializes all that is needed

Parameters:

  • block (Block)

    configuration block

Raises:


39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/water_drop/producer.rb', line 39

def setup(&block)
  raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial?

  @config = Config
            .new
            .setup(&block)
            .config

  @id = @config.id
  @monitor = @config.monitor
  @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
  @status.configured!
end

#validate_message!(message) ⇒ Object

Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there

Parameters:

  • message (Hash)

    message we want to send

Raises:

  • (Karafka::Errors::MessageInvalidError)

132
133
134
135
136
137
138
139
140
# File 'lib/water_drop/producer.rb', line 132

def validate_message!(message)
  result = @contract.call(message)
  return if result.success?

  raise Errors::MessageInvalidError, [
    result.errors.to_h,
    message
  ]
end