Class: R7Insight::Host::CONNECTION

Inherits:
Object
  • Object
show all
Includes:
InstanceMethods
Defined in:
lib/r7_insight/host/connection.rb

Overview

Class for connecting to the Rapid7 Insight Platform and handling the connection

Constant Summary collapse

DATA_ENDPOINT =
'.data.logs.insight.rapid7.com'
DATA_PORT_UNSECURE =
80
DATA_PORT_SECURE =
443
API_SSL_PORT =
20_000
SHUTDOWN_COMMAND =

magic shutdown command string for worker

'DIE!DIE!'
SHUTDOWN_MAX_WAIT =

max seconds to wait for queue shutdown clearing

10
SHUTDOWN_WAIT_STEP =

sleep duration (seconds) while shutting down

0.2
CONNECTION_EXCEPTIONS =
[
  Timeout::Error,
  Errno::EHOSTUNREACH,
  Errno::ECONNREFUSED,
  Errno::ECONNRESET,
  Errno::ETIMEDOUT,
  EOFError,
  Errno::EPIPE
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from InstanceMethods

#format_message, #formatter

Constructor Details

#initialize(token, region, local, debug, ssl, datahub_endpoint, host_id, custom_host, udp_port, use_data_endpoint) ⇒ CONNECTION

Returns a new instance of CONNECTION.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/r7_insight/host/connection.rb', line 36

def initialize(token, region, local, debug, ssl, datahub_endpoint,
               host_id, custom_host, udp_port, use_data_endpoint)
  if local
    device = if local.class <= TrueClass
               if defined?(Rails)
                 Rails.root.join('log', "#{Rails.env}.log")
               else
                 STDOUT
               end
             else
               local
             end
    @logger_console = Logger.new(device)
  end

  @region = region
  @local = local.nil? || local == false ? false : true # Replace "!!"
  @debug = debug
  @ssl = ssl
  @udp_port = udp_port
  @use_data_endpoint = use_data_endpoint

  @datahub_endpoint = datahub_endpoint
  if !@datahub_endpoint[0].empty?
    @datahub_enabled = true
    @datahub_ip = (@datahub_endpoint[0]).to_s
    @datahub_port = @datahub_endpoint[1]
  else
    @datahub_enabled = false
  end

  if @datahub_enabled && @ssl
    puts("\n\nYou Cannot have DataHub and SSL enabled at the same time.
 Please set SSL value to false in your environment.rb file or used Token-Based
 logging by leaving the Datahub IP address blank. Exiting application. \n\n")
    exit
  end

  # Check if region was specified
  puts("\n\nYou need to specify a region, such as 'eu'") if region.empty?

  # Check if DataHub is enabled
  # If datahub is not enabled, set the token to the token's parameter
  # If DH is enabled, make the token empty.
  if !datahub_enabled
    @token = token
  else
    @token = ''

    # !NOTE THIS @datahub_port conditional MAY NEED TO BE CHANGED IF SSL
    # CAN'T WORK WITH DH
    @datahub_port = @datahub_port.empty? ? API_SSL_PORT : datahub_port
    @datahub_ip = datahub_ip
  end

  @host_name_enabled = custom_host[0]
  @host_name = custom_host[1]

  if !host_id.empty?
    @host_id = host_id
    @host_id = "host_id=#{host_id}"
  else
    @host_id = ''
  end

  #  If no host name is given but required, assign the machine name
  if @host_name_enabled
    @host_name = Socket.gethostname if host_name.empty?

    @host_name = "host_name=#{@host_name}"
  end

  @queue = Queue.new
  @started = false
  @thread = nil

  init_debug if @debug
  at_exit { shutdown! }
end

Instance Attribute Details

#connObject

Returns the value of attribute conn.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def conn
  @conn
end

#custom_hostObject

Returns the value of attribute custom_host.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def custom_host
  @custom_host
end

#datahub_enabledObject

Returns the value of attribute datahub_enabled.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def datahub_enabled
  @datahub_enabled
end

#datahub_endpointObject

Returns the value of attribute datahub_endpoint.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def datahub_endpoint
  @datahub_endpoint
end

#datahub_ipObject

Returns the value of attribute datahub_ip.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def datahub_ip
  @datahub_ip
end

#datahub_portObject

Returns the value of attribute datahub_port.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def datahub_port
  @datahub_port
end

#debugObject

Returns the value of attribute debug.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def debug
  @debug
end

#host_idObject

Returns the value of attribute host_id.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def host_id
  @host_id
end

#host_nameObject

Returns the value of attribute host_name.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def host_name
  @host_name
end

#host_name_enabledObject

Returns the value of attribute host_name_enabled.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def host_name_enabled
  @host_name_enabled
end

#localObject

Returns the value of attribute local.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def local
  @local
end

#queueObject

Returns the value of attribute queue.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def queue
  @queue
end

#regionObject

Returns the value of attribute region.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def region
  @region
end

#sslObject

Returns the value of attribute ssl.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def ssl
  @ssl
end

#startedObject

Returns the value of attribute started.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def started
  @started
end

#threadObject

Returns the value of attribute thread.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def thread
  @thread
end

#tokenObject

Returns the value of attribute token.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def token
  @token
end

#udp_portObject

Returns the value of attribute udp_port.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def udp_port
  @udp_port
end

#use_data_endpointObject

Returns the value of attribute use_data_endpoint.



31
32
33
# File 'lib/r7_insight/host/connection.rb', line 31

def use_data_endpoint
  @use_data_endpoint
end

Instance Method Details

#check_async_threadObject



151
152
153
# File 'lib/r7_insight/host/connection.rb', line 151

def check_async_thread
  @thread = Thread.new { run } unless @thread&.alive?
end

#closeObject



155
156
157
158
# File 'lib/r7_insight/host/connection.rb', line 155

def close
  dbg 'R7Insight: Closing asynchronous socket writer'
  @started = false
end

#close_connectionObject



240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/r7_insight/host/connection.rb', line 240

def close_connection
  begin
    if @conn.respond_to?(:sysclose)
      @conn.sysclose
    elsif @conn.respond_to?(:close)
      @conn.close
    end
  rescue StandardError
    dbg "R7Insight: couldn't close connection, close with exception -
 #{$ERROR_INFO}"
  ensure
    @conn = nil
  end
end

#dbg(message) ⇒ Object



122
123
124
# File 'lib/r7_insight/host/connection.rb', line 122

def dbg(message)
  @debug_logger.add(Logger::Severity::DEBUG, message) if @debug
end

#init_debugObject



116
117
118
119
120
# File 'lib/r7_insight/host/connection.rb', line 116

def init_debug
  file_path = 'r7insightGem.log'
  file_path = 'log/r7insightGem.log' if File.exist?('log/')
  @debug_logger = Logger.new(file_path)
end

#open_connectionObject



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/r7_insight/host/connection.rb', line 160

def open_connection
  dbg 'R7Insight: Reopening connection to R7Insight API server'

  if @use_data_endpoint
    host = @region + DATA_ENDPOINT

    port = @ssl ? DATA_PORT_SECURE : DATA_PORT_UNSECURE
  elsif @udp_port
    host = @region + DATA_ENDPOINT
    port = @udp_port
  elsif @datahub_enabled
    host = @datahub_ip
    port = @datahub_port
  else
    host = @region + DATA_ENDPOINT
    port = @ssl ? DATA_PORT_SECURE : DATA_PORT_UNSECURE
  end

  if @udp_port
    @conn = UDPSocket.new
    @conn.connect(host, port)
  else
    socket = TCPSocket.new(host, port)

    if @ssl
      cert_store = OpenSSL::X509::Store.new
      cert_store.set_default_paths

      ssl_context = OpenSSL::SSL::SSLContext.new
      ssl_context.cert_store = cert_store

      if OpenSSL::SSL::SSLContext.method_defined? :min_version=
        ssl_context.min_version = OpenSSL::SSL::TLS1_1_VERSION
      end
      if OpenSSL::SSL::SSLContext.method_defined? :max_version=
        # For older versions of openssl (prior to 1.1.1) missing support for TLSv1.3
        ssl_context.max_version = if defined?(OpenSSL::SSL::TLS1_3_VERSION)
                                    OpenSSL::SSL::TLS1_3_VERSION
                                  else
                                    OpenSSL::SSL::TLS1_2_VERSION
                                  end
      end
      ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
      ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
      ssl_socket.hostname = host if ssl_socket.respond_to?(:hostname=)
      ssl_socket.sync_close = true
      Timeout.timeout(10) do
        ssl_socket.connect
      end
      @conn = ssl_socket
    else
      @conn = socket
    end
  end

  dbg 'R7Insight: Connection established'
end

#reopen_connectionObject



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/r7_insight/host/connection.rb', line 218

def reopen_connection
  close_connection
  root_delay = 0.1
  loop do
    begin
      open_connection
      break
    rescue *CONNECTION_EXCEPTIONS
      dbg "R7Insight: Unable to connect to R7Insight due to timeout
(#{$ERROR_INFO})"
    rescue StandardError
      dbg "R7Insight: Got exception in reopenConnection - #{$ERROR_INFO}"
      raise
    end
    root_delay *= 2
    root_delay = 10 if root_delay >= 10
    wait_for = (root_delay + rand(root_delay)).to_i
    dbg "R7Insight: Waiting for #{wait_for}ms"
    sleep(wait_for)
  end
end

#runObject



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/r7_insight/host/connection.rb', line 255

def run
  reopen_connection

  loop do
    data = @queue.pop
    break if data == SHUTDOWN_COMMAND

    loop do
      begin
        @conn.write(data)
      rescue *CONNECTION_EXCEPTIONS
        dbg "R7Insight: Connection timeout(#{$ERROR_INFO}), try to reopen
connection"
        reopen_connection
        next
      rescue StandardError
        dbg "R7Insight: Got exception in run loop - #{$ERROR_INFO}"
        raise
      end
      break
    end
  end

  dbg 'R7Insight: Closing Asynchronous socket writer'

  close_connection
end

#start_async_threadObject



145
146
147
148
149
# File 'lib/r7_insight/host/connection.rb', line 145

def start_async_thread
  @thread = Thread.new { run }
  dbg 'R7Insight: Asynchronous socket writer started'
  @started = true
end

#write(message) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/r7_insight/host/connection.rb', line 126

def write(message)
  message = "#{message} #{host_id}" unless host_id.empty?
  message = "#{message} #{host_name}" if host_name_enabled

  @logger_console.add(Logger::Severity::UNKNOWN, message) if @local

  @queue << if message.scan(/\n/).empty?
              "#{@token} #{message} \n"
            else
              "#{message.gsub(/^/, "#{@token} [#{random_message_id}]")}\n"
            end

  if @started
    check_async_thread
  else
    start_async_thread
  end
end