Class: Writer
- Inherits:
-
Object
- Object
- Writer
- Includes:
- MonitorMixin
- Defined in:
- lib/fluent/command/cat.rb
Defined Under Namespace
Classes: TimerThread
Constant Summary collapse
- RetryLimitError =
Class.new(StandardError)
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil) ⇒ Writer
constructor
A new instance of Writer.
- #on_timer ⇒ Object
- #secondary_record?(record) ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(record) ⇒ Object
Constructor Details
#initialize(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil) ⇒ Writer
Returns a new instance of Writer.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/fluent/command/cat.rb', line 142 def initialize(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil) @tag = tag @connector = connector @socket = false @socket_time = Time.now.to_i @socket_ttl = 10 # TODO @error_history = [] @pending = [] @pending_limit = 1024 # TODO @retry_wait = 1 @retry_limit = retry_limit @time_as_integer = time_as_integer @event_time = event_time super() end |
Instance Method Details
#close ⇒ Object
222 223 224 225 |
# File 'lib/fluent/command/cat.rb', line 222 def close @socket.close @socket = nil end |
#on_timer ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/fluent/command/cat.rb', line 203 def on_timer now = Time.now.to_i synchronize { unless @pending.empty? # flush pending records if write_impl(@pending) # write succeeded @pending.clear end end if @socket && @socket_time + @socket_ttl < now # socket is not used @socket_ttl seconds close end } end |
#secondary_record?(record) ⇒ Boolean
161 162 163 164 165 166 |
# File 'lib/fluent/command/cat.rb', line 161 def secondary_record?(record) record.class != Hash && record.size == 2 && record.first.class == Fluent::EventTime && record.last.class == Hash end |
#shutdown ⇒ Object
233 234 235 |
# File 'lib/fluent/command/cat.rb', line 233 def shutdown @timer.shutdown end |
#start ⇒ Object
227 228 229 230 231 |
# File 'lib/fluent/command/cat.rb', line 227 def start @timer = TimerThread.new(self) @timer.start self end |
#write(record) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/fluent/command/cat.rb', line 168 def write(record) unless secondary_record?(record) if record.class != Hash raise ArgumentError, "Input must be a map (got #{record.class})" end end time = if @event_time Fluent::EventTime.parse(@event_time) else Fluent::EventTime.now end time = time.to_i if @time_as_integer entry = if secondary_record?(record) # Even though secondary contains Fluent::EventTime in record, # fluent-cat just ignore it and set Fluent::EventTime.now instead. # This specification is adopted to keep consistency. [time, record.last] else [time, record] end synchronize { unless write_impl([entry]) # write failed @pending.push(entry) while @pending.size > @pending_limit # exceeds pending limit; trash oldest record time, record = @pending.shift (time, record) end end } end |