Class: Fluent::Plugin::SeqOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_seq.rb

Constant Summary collapse

LOG_LEVELS =
{
  'TRACE' => 'Verbose',
  'DEBUG' => 'Debug',
  'INFO' => 'Information',
  'WARNING' => 'Warning',
  'ERROR' => 'Error',
  'FATAL' => 'Fatal'
}

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_seq.rb', line 43

def configure(conf)
  super

  if @scheme == "https" and @port != 443
    log.warn "Scheme is set to https but port is not 443 (port: #{port})"
  end

  host = @host.gsub(/\/$/, '') # remove last '/'

  path = ''
  if @path
    path = @path.gsub(/(^\/)|(\/$)/, '')
  end

  @server_url = "#{@scheme}://#{host}:#{@port}/#{path}"
  @base_api = "#{@server_url}api"
end

#format_event(time, record) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/out_seq.rb', line 74

def format_event(time, record)
  {
    "Timestamp": Time.at(time).to_s,
    "MessageTemplate": record['MessageTemplate'] || record['message'] || record['msg'] || record['log'] || '(No message provided)',
    "Level": LOG_LEVELS[record['Level'] || record['level'] || @default_level],
    "Exception": record['stack'] || record.dig('err', 'stack') || record['exc_info'] || nil,
    "Properties": record
  }
end

#post_events(events) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/out_seq.rb', line 84

def post_events(events)
  api_path = "#{@base_api}\/events\/raw"

  uri = URI.parse(api_path)

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = @scheme == "https"
  http = http.start

  headers = {'Content-Type' => 'application/json'}
  headers['X-Seq-ApiKey'] = @api_key if @api_key

  req = Net::HTTP::Post.new(uri.path, initheader = headers)
  body = {"Events": events}
  req.body = body.to_json
  res = http.request(req)

  log.debug "#{res.code} #{res.message}"
rescue Exception
  log.error $!
end

#process(tag, es) ⇒ Object



106
107
108
109
110
111
# File 'lib/fluent/plugin/out_seq.rb', line 106

def process(tag, es)
  es.each do |time, record|
    for_seq = format_event(time, record)
    post_events([for_seq])
  end
end

#startObject



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_seq.rb', line 61

def start
  super

  uri = URI.parse("#{@server_url}health")

  timer_execute(:health_timer, 60) do
    health = Net::HTTP.get(uri)
    log.debug health
  rescue
    log.error "Cannot connect to Seq server at #{@server_url}"
  end
end

#write(chunk) ⇒ Object



113
114
115
116
117
118
# File 'lib/fluent/plugin/out_seq.rb', line 113

def write(chunk)
  # Post all events from the chunk at once
  events = []
  chunk.each { |time, record| events.append(format_event(time, record)) }
  post_events(events)
end