Class: Fluent::Rdkafka2Output::EnqueueRate
- Inherits:
-
Object
- Object
- Fluent::Rdkafka2Output::EnqueueRate
- Defined in:
- lib/fluent/plugin/out_rdkafka2.rb
Defined Under Namespace
Classes: LimitExceeded
Instance Method Summary collapse
-
#initialize(limit_bytes_per_second) ⇒ EnqueueRate
constructor
A new instance of EnqueueRate.
- #raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object
- #revert ⇒ Object
Constructor Details
#initialize(limit_bytes_per_second) ⇒ EnqueueRate
Returns a new instance of EnqueueRate.
123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 123 def initialize(limit_bytes_per_second) @mutex = Mutex.new @start_clock = Fluent::Clock.now @bytes_per_second = 0 @limit_bytes_per_second = limit_bytes_per_second @commits = {} end |
Instance Method Details
#raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 131 def raise_if_limit_exceeded(bytes_to_enqueue) return if @limit_bytes_per_second.nil? @mutex.synchronize do @commits[Thread.current] = { clock: Fluent::Clock.now, bytesize: bytes_to_enqueue, } @bytes_per_second += @commits[Thread.current][:bytesize] duration = @commits[Thread.current][:clock] - @start_clock if duration < 1.0 if @bytes_per_second > @limit_bytes_per_second raise LimitExceeded.new(@start_clock + 1.0) end else @start_clock = @commits[Thread.current][:clock] @bytes_per_second = @commits[Thread.current][:bytesize] end end end |
#revert ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 154 def revert return if @limit_bytes_per_second.nil? @mutex.synchronize do return unless @commits[Thread.current] return unless @commits[Thread.current][:clock] if @commits[Thread.current][:clock] >= @start_clock @bytes_per_second -= @commits[Thread.current][:bytesize] end @commits[Thread.current] = nil end end |