Class: LogStash::Outputs::HTTPBatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/HTTPBatcher.rb

Instance Method Summary collapse

Constructor Details

#initialize(url, idle_flush, logger, headers, limit, nthreads, verbose) ⇒ HTTPBatcher

Returns a new instance of HTTPBatcher.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/logstash/outputs/HTTPBatcher.rb', line 11

def initialize(url, idle_flush, logger, headers, limit, nthreads, verbose)
  @url = URI.parse(url)
  @idle_flush = idle_flush
  @logger = logger
  @headers = headers
  @limit = limit
  @verbose = verbose

  @content_type = "application/json"
  @stopped = false

  @queue_mutex = Mutex.new
  @queue = []

  @flush_time = nil
  @flush_thread = create_flush_thread()
  @work_queue = WorkQueue.new nthreads, nil
end

Instance Method Details

#create_flush_threadObject

def receive



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/outputs/HTTPBatcher.rb', line 58

def create_flush_thread
  return Thread.new do
    while !@stopped do
      now = Time.now
      if @flush_time != nil && @flush_time <= now
        enqueue_batch
        @flush_time = nil
      end

      sleep(@flush_time == nil ? @idle_flush : @flush_time - now)
    end
  end
end

#enqueue_batchObject



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/logstash/outputs/HTTPBatcher.rb', line 72

def enqueue_batch
  tosend = []
  @queue_mutex.synchronize do
    tosend = @queue.shift(@limit)
  end

  if tosend.size > 0
    @work_queue.enqueue_b do
      send_batch tosend
    end
  end
end

#receive(event) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logstash/outputs/HTTPBatcher.rb', line 44

def receive(event)
  size = 0
  @queue_mutex.synchronize do
    @queue << event
    size = @queue.size
  end

  if size >= @limit
    enqueue_batch
  end

  @flush_time = Time.now + @idle_flush
end

#send_batch(tosend) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/logstash/outputs/HTTPBatcher.rb', line 85

def send_batch(tosend)
  connection = Thread.current["connection"]
  if connection == nil
    if @verbose
      puts "creating new https connection"
    end

    connection = Net::HTTP.new(@url.host, @url.port)
    connection.use_ssl = true
    connection.verify_mode = OpenSSL::SSL::VERIFY_PEER
    Thread.current["connection"] = connection
  end

  beginning = Time.now
  request = Net::HTTP::Post.new(@url.request_uri)

  request["Content-Type"] = @content_type
  if @headers
    @headers.each do |k,v|
      request.headers[k] = v
    end
  end
  
  if @verbose
    puts "posting #{tosend.size} records"
  end

  request.body = LogStash::Json.dump(tosend)
  response = connection.request request

  status = response.code
  rbody = response.read_body

  if status != "200"
    raise "POST failed with status #{status} (#{rbody})"
  end

  if @verbose
    time = Time.now - beginning
    puts "POST response in #{time.to_s} #{status} #{rbody}"
  end

rescue Exception => e
  if @verbose
    @logger.warn("Unhandled exception", :request => request, :exception => e, :stacktrace => e.backtrace)
  else
    @logger.warn("Unhandled exception", :host => request["host"], :exception => e, :stacktrace => e.backtrace)
  end
end

#stopObject

def initialize



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/logstash/outputs/HTTPBatcher.rb', line 30

def stop
  if @verbose
    puts "stopping batcher (have #{@queue.size()} queued message)"
  end

  @stopped = true
  while @queue.size() > 0 do
    enqueue_batch
  end

  @flush_thread.join
  @work_queue.join
end