Class: Fluent::SomeOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_coralogix.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_coralogix.rb', line 33

def configure(conf)
  super
  begin
    @currentWorker = 0;
    @workers = {}
    @app_name_from_config = DEFAULT_appname;
    @sub_name_from_config = DEFAULT_subsystemname;
    i = 0
    log.info "configure #{number_of_workers} workers"
    until i == number_of_workers do
      @workers[i.to_s] = {}
      log.info "init worker ##{i}"
      private_key = get_private_key
      if !appname.start_with?("$") && !subsystemname.start_with?("$")
        @app_name_from_config = config.fetch("APP_NAME", appname)
        @sub_name_from_config = config.fetch("SUB_SYSTEM", subsystemname)
        @workers[i.to_s]["#{@app_name_from_config}.#{@sub_name_from_config}"] = CoralogixLogger.new private_key, @app_name_from_config, @sub_name_from_config, false, "FluentD (#{version?})", force_compression, proxy ? proxy.to_h.map { |k,v| [k.to_s,v] }.to_h : Hash.new, log
      else
        @workers[i.to_s] = {}
      end
      i+=1
    end

    unless endpoint.nil?
      ENV["CORALOGIX_LOG_URL"] = "https://#{endpoint}/api/v1/logs"
      ENV["CORALOGIX_TIME_DELTA_URL"] = "https://#{endpoint}/sdk/v1/time"
    end

  rescue Exception => e
    log.error "Failed to configure: #{e}"
  end
end

#emit(tag, es, chain) ⇒ Object

This method is called when an event reaches Fluentd. ‘es’ is a Fluent::EventStream object that includes multiple events. You can use ‘es.each {|time,record| … }’ to retrieve events. ‘chain’ is an object that manages transactions. Call ‘chain.next’ at appropriate points and rollback if it raises an exception.

NOTE! This method is called by Fluentd’s main thread so you should not write slow routine here. It causes Fluentd’s performance degression.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin/out_coralogix.rb', line 141

def emit(tag, es, chain)
  chain.next
  es.each { |time, record|
    logger = get_logger(record)

    log_record = log_key_name != nil ? record.fetch(log_key_name, record) : record
    log_record = is_json ? log_record.to_json : log_record
    log_record = log_record.to_s.empty? ? record : log_record

    timestamp = record.fetch(timestamp_key_name, nil)
    if (timestamp.nil?)
      logger.debug log_record
    else
      begin
        float_timestamp = DateTime.parse(timestamp.to_s).to_time.to_f * 1000
        logger.debug log_record, nil, timestamp: float_timestamp
      rescue Exception => e
        logger.debug log_record
      end
    end
  }
end

#extract(record, key, default) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_coralogix.rb', line 78

def extract record, key, default
  begin
    res = record
    return key unless key.start_with?("$")
    key[1..-1].split(".").each do |k|
      res = res.fetch(k, nil)
      return default if res == nil
    end
    return res
  rescue Exception => e
    log.error "Failed to extract #{key}: #{e}"
    return default
  end
end

#get_app_sub_name(record) ⇒ Object



94
95
96
97
98
# File 'lib/fluent/plugin/out_coralogix.rb', line 94

def get_app_sub_name(record)
  app_name = extract(record, appname, DEFAULT_appname)
  sub_name = extract(record, subsystemname, DEFAULT_subsystemname)
  return app_name, sub_name
end

#get_logger(record) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/out_coralogix.rb', line 100

def get_logger(record)
  private_key = get_private_key
  if !appname.start_with?("$") && !subsystemname.start_with?("$")
    app_name = @app_name_from_config
    sub_name = @sub_name_from_config
  else
    app_name, sub_name = get_app_sub_name(record)
  end
  
  # YK@2020-11-26T10:56 - We had encountered a case in which this value reached above 7K and the value of worker became null
  if @currentWorker >= number_of_workers
    @currentWorker = 0
  end
  worker = @workers[@currentWorker.to_s]
  @currentWorker+=1;
  if !worker.key?("#{app_name}.#{sub_name}")
    worker["#{app_name}.#{sub_name}"] = CoralogixLogger.new private_key, app_name, sub_name, false, "FluentD (#{version?})", force_compression, proxy ? proxy.to_h.map { |k, v| [k.to_s, v] }.to_h : Hash.new, log
  end

  return worker["#{app_name}.#{sub_name}"]
  
end

#get_private_keyObject



66
67
68
# File 'lib/fluent/plugin/out_coralogix.rb', line 66

def get_private_key
  return config.fetch("PRIVATE_KEY", privatekey)
end

#shutdownObject

This method is called when shutting down.



130
131
132
# File 'lib/fluent/plugin/out_coralogix.rb', line 130

def shutdown
  super
end

#startObject

This method is called when starting.



125
126
127
# File 'lib/fluent/plugin/out_coralogix.rb', line 125

def start
  super
end

#version?Boolean

Returns:

  • (Boolean)


70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_coralogix.rb', line 70

def version?
  begin
    Gem.loaded_specs['fluent-plugin-coralogix'].version.to_s
  rescue Exception => e
    return '0.0.0'
  end
end