Class: LogStash::Outputs::Kinetica

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient
Defined in:
lib/logstash/outputs/kinetica.rb

Defined Under Namespace

Classes: RetryTimerTask

Constant Summary collapse

VALID_METHODS =
["post"]
RETRYABLE_MANTICORE_EXCEPTIONS =
[
	::Manticore::Timeout,
	::Manticore::SocketException,
	::Manticore::ClientProtocolException,
	::Manticore::ResolutionFailure,
	::Manticore::SocketTimeout
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#is_batchObject

Returns the value of attribute is_batch.



15
16
17
# File 'lib/logstash/outputs/kinetica.rb', line 15

def is_batch
  @is_batch
end

Instance Method Details

#closeObject



273
274
275
276
# File 'lib/logstash/outputs/kinetica.rb', line 273

def close
	@timer.cancel
	client.close
end

#log_error_response(response, url, event) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/logstash/outputs/kinetica.rb', line 147

def log_error_response(response, url, event)
	body_json=JSON.parse(response.body)
	response_message = body_json["message"]
	log_failure(
						"Encountered an error:",
						:response_code => response.code,
						:response_message => response_message,
						:url => url,
						:event => event
					)
end

#log_retryable_response(response) ⇒ Object



139
140
141
142
143
144
145
# File 'lib/logstash/outputs/kinetica.rb', line 139

def log_retryable_response(response)
	if (response.code == 429)
		@logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
	else
		@logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body)
	end
end

#multi_receive(events) ⇒ Object

def register



121
122
123
124
# File 'lib/logstash/outputs/kinetica.rb', line 121

def multi_receive(events)
	return if events.empty?
	send_events(events)
end

#registerObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/logstash/outputs/kinetica.rb', line 88

def register
	#@http_method = @http_method.to_sym

	# We count outstanding requests with this queue
	# This queue tracks the requests to create backpressure
	# When this queue is empty no new requests may be sent,
	# tokens must be added back by the client on success
	@request_tokens = SizedQueue.new(@pool_max)
	@pool_max.times {|t| @request_tokens << true }

	@requests = Array.new

	if @content_type.nil?
		case @format
			#when "form" ; @content_type = "application/x-www-form-urlencoded"
			#when "json" ; @content_type = "application/json"
			#when "json_batch" ; @content_type = "application/json"
			when "csv" ; @content_type = "application/json"
		end
	end

	@is_batch = @format == "json_batch"

	@headers["Content-Type"] = @content_type

	# Run named Timer as daemon thread
	@timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true)
	
	@text_delimiter = options["text_delimiter"].nil? ? "," : options["text_delimiter"]
	
	@text_escape_character = options["text_escape_character"].nil? ? "\\" : options["text_escape_character"]
end

#send_event(event, attempt) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/logstash/outputs/kinetica.rb', line 226

def send_event(event, attempt)
	body = event_body(event)
	
	# Send the request
	url = @is_batch ? @url : event.sprintf(@url)
	headers = @is_batch ? @headers : event_headers(event)

	# Compress the body and add appropriate header
	if @http_compression == true
		headers["Content-Encoding"] = "gzip"
		body = gzip(body)
	end

	# Create an async request
	puts(@logger.class)
	response = client.send(:post, url, :body => body, :headers => headers).call
	if !response_success?(response)
		if retryable_response?(response)
			log_retryable_response(response)
			return :retry, event, attempt
		else
			log_error_response(response, url, event)
			return :failure, event, attempt
		end
	else
		return :success, event, attempt
	end

rescue => exception
	will_retry = retryable_exception?(exception)
	log_failure("Could not fetch URL",
							:url => url,
							:body => body,
							:headers => headers,
							:message => exception.message,
							:class => exception.class.name,
							:backtrace => exception.backtrace,
							:will_retry => will_retry
	)

	if will_retry
		return :retry, event, attempt
	else
		return :failure, event, attempt
	end
end

#send_events(events) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/logstash/outputs/kinetica.rb', line 159

def send_events(events)
	successes = java.util.concurrent.atomic.AtomicInteger.new(0)
	failures	= java.util.concurrent.atomic.AtomicInteger.new(0)
	retries = java.util.concurrent.atomic.AtomicInteger.new(0)
	event_count = @is_batch ? 1 : events.size

	pending = Queue.new
	if @is_batch
		pending << [events, 0]
	else
		events.each {|e| pending << [e, 0]}
	end

	while popped = pending.pop
		break if popped == :done

		event, attempt = popped

		action, event, attempt = send_event(event, attempt)
		begin
			action = :failure if action == :retry && !@retry_failed

			case action
			when :success
				successes.incrementAndGet
			when :retry
				retries.incrementAndGet
				next_attempt = attempt+1
				sleep_for = sleep_for_attempt(next_attempt)
				@logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
				timer_task = RetryTimerTask.new(pending, event, next_attempt)
				@timer.schedule(timer_task, sleep_for*1000)
			when :failure
				failures.incrementAndGet
			else
				raise "Unknown action #{action}"
			end

			if action == :success || action == :failure
				if successes.get+failures.get == event_count
					pending << :done
				end
			end
		rescue => e
			# This should never happen unless there's a flat out bug in the code
			@logger.error("Error sending HTTP Request",
				:class => e.class.name,
				:message => e.message,
				:backtrace => e.backtrace)
			failures.incrementAndGet
			raise e
		end
	end
rescue => e
	@logger.error("Error in http output loop",
					:class => e.class.name,
					:message => e.message,
					:backtrace => e.backtrace)
	raise e
end

#sleep_for_attempt(attempt) ⇒ Object



220
221
222
223
224
# File 'lib/logstash/outputs/kinetica.rb', line 220

def sleep_for_attempt(attempt)
	sleep_for = attempt**2
	sleep_for = sleep_for <= 60 ? sleep_for : 60
	(sleep_for/2) + (rand(0..sleep_for)/2)
end