Class: Fluent::Rdkafka2Output::EnqueueRate

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_rdkafka2.rb

Defined Under Namespace

Classes: LimitExceeded

Instance Method Summary collapse

Constructor Details

#initialize(limit_bytes_per_second) ⇒ EnqueueRate

Returns a new instance of EnqueueRate.



123
124
125
126
127
128
129
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 123

def initialize(limit_bytes_per_second)
  @mutex = Mutex.new
  @start_clock = Fluent::Clock.now
  @bytes_per_second = 0
  @limit_bytes_per_second = limit_bytes_per_second
  @commits = {}
end

Instance Method Details

#raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 131

def raise_if_limit_exceeded(bytes_to_enqueue)
  return if @limit_bytes_per_second.nil?

  @mutex.synchronize do
    @commits[Thread.current] = {
      clock: Fluent::Clock.now,
      bytesize: bytes_to_enqueue,
    }

    @bytes_per_second += @commits[Thread.current][:bytesize]
    duration = @commits[Thread.current][:clock] - @start_clock

    if duration < 1.0
      if @bytes_per_second > @limit_bytes_per_second
        raise LimitExceeded.new(@start_clock + 1.0)
      end
    else
      @start_clock = @commits[Thread.current][:clock]
      @bytes_per_second = @commits[Thread.current][:bytesize]
    end
  end
end

#revertObject



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 154

def revert
  return if @limit_bytes_per_second.nil?

  @mutex.synchronize do
    return unless @commits[Thread.current]
    return unless @commits[Thread.current][:clock]
    if @commits[Thread.current][:clock] >= @start_clock
      @bytes_per_second -= @commits[Thread.current][:bytesize]
    end
    @commits[Thread.current] = nil
  end
end