Class: LogStash::Outputs::Dis

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/dis.rb

Overview

Write events to a DIS stream, using DIS Kafka Adapter.

Instance Method Summary collapse

Instance Method Details

#closeObject



201
202
203
# File 'lib/logstash/outputs/dis.rb', line 201

def close
  @producer.close
end

#multi_receive(events) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/logstash/outputs/dis.rb', line 140

def multi_receive(events)
  t = Thread.current
  if !@thread_batch_map.include?(t)
    @thread_batch_map[t] = java.util.ArrayList.new(events.size)
  end

  events.each do |event|
    break if event == LogStash::SHUTDOWN
    @codec.encode(event)
  end

  batch = @thread_batch_map[t]
  if batch.any?
    retrying_send(batch)
    batch.clear
  end
end

#prepare(record) ⇒ Object

def register



135
136
137
138
# File 'lib/logstash/outputs/dis.rb', line 135

def prepare(record)
  # This output is threadsafe, so we need to keep a batch per thread.
  @thread_batch_map[Thread.current].add(record)
end

#registerObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/logstash/outputs/dis.rb', line 107

def register
  @thread_batch_map = Concurrent::Hash.new

  if !@retries.nil? 
    if @retries < 0
      raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
    end

    @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
  end


  @producer = create_producer
  if value_serializer == 'com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer'
    @codec.on_event do |event, data|
      write_to_dis(event, data)
    end
  elsif value_serializer == 'com.huaweicloud.dis.adapter.kafka.common.serialization.ByteArraySerializer'
    @codec.on_event do |event, data|
      write_to_dis(event, data.to_java_bytes)
    end
  else
    raise ConfigurationError, "'value_serializer' only supports com.huaweicloud.dis.adapter.kafka.common.serialization.ByteArraySerializer and com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer" 
  end
end

#retrying_send(batch) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/logstash/outputs/dis.rb', line 158

def retrying_send(batch)
  remaining = @retries;

  while batch.any?
    if !remaining.nil?
      if remaining < 0
        # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
        # DLQing would make things worse (you dlq data that would be successful
        # after the fault is repaired)
        logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
                    :max_retries => @retries, :drop_count => batch.count)
        break
      end

      remaining -= 1
    end

    failures = []

    futures = batch.collect do |record| 
      begin
        # send() can throw an exception even before the future is created.
        @producer.send(record)
      rescue org.apache.kafka.common.errors.TimeoutException => e
        failures << record
        nil
      rescue org.apache.kafka.common.errors.InterruptException => e
        failures << record
        nil
      rescue com.huaweicloud.dis.adapter.kafka.common.errors.SerializationException => e
        # TODO(sissel): Retrying will fail because the data itself has a problem serializing.
        # TODO(sissel): Let's add DLQ here.
        failures << record
        nil
      end
    end.compact

    break

  end

end