Class: Fluent::Plugin::SdnsApiListThreatDetailInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sdns_api.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#query_end_timeObject

Returns the value of attribute query_end_time.



25
26
27
# File 'lib/fluent/plugin/in_sdns_api.rb', line 25

def query_end_time
  @query_end_time
end

#query_start_timeObject

Returns the value of attribute query_start_time.



25
26
27
# File 'lib/fluent/plugin/in_sdns_api.rb', line 25

def query_start_time
  @query_start_time
end

Instance Method Details

#configure(conf) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/in_sdns_api.rb', line 61

def configure(conf)
  super
  if @access_key == ""
    raise Fluent::ConfigError, "access_key is required"
  end
  if @access_secret == ""
    raise Fluent::ConfigError, "access_secret is required"
  end
  if @endpoint == ""
    raise Fluent::ConfigError, "endpoint is required"
  end
  if @scrape_interval < 60
    raise Fluent::ConfigError, "scrape_interval must be greater than 60"
  end
  @query_start_time = (Time.now.to_i - @scrape_interval).to_i
  $log.info("sdns api query start time: #{@query_start_time}")
end

#get_full_url(start_time, end_time, cur_page) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_sdns_api.rb', line 88

def get_full_url(start_time, end_time, cur_page)
  uri_parse = URI.parse(@endpoint)
  query = CGI.parse(uri_parse.query ? uri_parse.query : "")
  query["time_start"] = start_time
  query["time_end"] = end_time
  query["cur_page"] = cur_page
  query["page_size"] = @page_size
  query["domain"] = @domain if @domain != ""
  query["client_ip"] = @client_ip if @client_ip != ""
  query["security_zone_id"] = @security_zone_id if @security_zone_id != ""
  query["threat_level"] = @threat_level if @threat_level.length > 0

  uri_parse.query = URI.encode_www_form(query)
  uri_parse.to_s
end

#get_sdns_api_data(full_url) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/fluent/plugin/in_sdns_api.rb', line 161

def get_sdns_api_data(full_url)
  begin
    sig = SdnsApiSinger::Signer.new
    sig.key = @access_key
    sig.secret = @access_secret

    r = SdnsApiSinger::HttpRequest.new("GET", full_url)
    r.headers = {"content-type" => "application/json"}
    r.body = ''
    sig.sign(r)
    uri = URI.parse("#{r.scheme}://#{r.host}#{r.uri}")
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = true if uri.scheme == 'https'
    request = Net::HTTP::Get.new(uri)
    request.initialize_http_header(r.headers)
    response = http.request(request)
    if response.code != "200"
      $log.error("sdns api query failed: #{response.code} #{response.message}")
      return
    end
    response_body_hash = JSON.parse(response.body)
    if response_body_hash["errno"] != 0
      $log.error("sdns api query failed: #{response_body_hash["errno"]} #{response_body_hash["errmsg"]}")
      return 
    end
    response_body_hash
  rescue => e
    $log.error(e.message)
  end
end

#refresh_watchersObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_sdns_api.rb', line 104

def refresh_watchers
  begin
    cur_page = 1
    start_time = @query_start_time
    end_time = Time.now.to_i

    @query_start_time = end_time # set next query start time
    $log.debug("sdns api query time duration: #{start_time} - #{end_time}")
    full_url = get_full_url(start_time, end_time, cur_page)
    $log.debug("sdns api query: #{full_url}")
    res = get_sdns_api_data(full_url)
    if res.nil?
      return
    end
    $log.info("sdns api query result total: #{res['data']['total']}")
    send_msg(res['data']['items'])
    for cur_page in 2..res['data']['total_pages'] do
      full_url = get_full_url(start_time, end_time, cur_page)
      $log.debug("sdns api query: #{full_url}")
      res = get_sdns_api_data(full_url)
      if res.nil?
        return
      end
      # puts res['data']['cur_page']
      send_msg(res['data']['items'])
    end
  rescue => e
    $log.error(e.message)
  end
end

#send_msg(items) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/fluent/plugin/in_sdns_api.rb', line 135

def send_msg(items)
  begin
    for item in items do
      record = {
        "query_timestamp" => item["query_timestamp"],
        "query_time_format" => item["query_time_format"],
        "security_zone_id" => item["security_zone_id"],
        "security_zone_name" => item["security_zone_name"],
        "client_ip" => item["client_ip"],
        "client_port" => item["client_port"],
        "domain" => item["domain"],
        "query_type"  => item["query_type"],
        "action" => item["action"],
        "threat_level" => item["threat_level"],
        "threat_type" => item["threat_type"],
        "threat_name" => item["threat_name"],
        "threat_description" => item["threat_description"].gsub("\n", '\\n').gsub("\t", '\\t'),
      }
      es = OneEventStream.new(Fluent::EventTime.now, record)
      router.emit_stream(@tag, es)
    end
  rescue => e
    $log.error(e.message)
  end
end

#shutdownObject



192
193
194
# File 'lib/fluent/plugin/in_sdns_api.rb', line 192

def shutdown
  super
end

#startObject



79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/in_sdns_api.rb', line 79

def start
  super
  # Startup code goes here!
  $log.info("sdns api start")
  # run inmediately
  refresh_watchers()
  timer_execute(:execute_sdns_api, @scrape_interval, &method(:refresh_watchers))
end