Class: CloudReactorWrapperIO::StatusUpdater

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudreactor_wrapper_io/status_updater.rb

Overview

A service used to update the CloudReactor service of the status of a currently running process. The service sends UDP messages to a process wrapper, which then forwards the updates to CloudReactor.

Author:

Constant Summary collapse

DEFAULT_STATUS_UPDATE_PORT =
2373

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(enabled: nil, port: nil, logger: nil) ⇒ StatusUpdater

Create a new instance.

Parameters:

  • enabled (Boolean, nil) (defaults to: nil)

    true to enable updates. If nil, will use the PROC_WRAPPER_ENABLE_STATUS_UPDATE_LISTENER environment variable.

  • port (Integer, nil) (defaults to: nil)

    the port number to use. If nil, will use the PROC_WRAPPER_STATUS_UPDATE_SOCKET_PORT environment variable, or 2373 if the environment variable is not available.

  • logger (Logger, nil) (defaults to: nil)

    the logger to use. If nil, will use Rails.logger if available, otherwise will create a new Logger.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/cloudreactor_wrapper_io/status_updater.rb', line 38

def initialize(enabled: nil, port: nil, logger: nil)
  @logger = logger

  unless @logger
    if defined?(Rails)
      @logger = Rails.logger
    else
      @logger = Logger.new(STDOUT)
    end
  end

  @socket = nil
  @port = nil

  if enabled.nil?
    @enabled = (ENV['PROC_WRAPPER_ENABLE_STATUS_UPDATE_LISTENER'] ||
      'FALSE').upcase == 'TRUE'
  else
    @enabled = enabled
  end

  if @enabled
    @logger.info('ProcessStatusUpdater is enabled')
  else
    @logger.info('ProcessStatusUpdater is disabled')
    return
  end

  if port.nil?
    @port = (ENV['PROC_WRAPPER_STATUS_UPDATE_SOCKET_PORT'] ||
      DEFAULT_STATUS_UPDATE_PORT).to_i
  else
    @port = port
  end

  at_exit do
    @logger.info('Shutting down status update socket ...')
    begin
      if @socket
        @socket.shutdown
        @socket = nil
        @logger.info('Done shutting down status update socket.')
      else
        @logger.info('No socket to close.')
      end
    rescue => ex
      @logger.info('Error shutting down status update socket.')
    end
  end
end

Instance Attribute Details

#enabledObject (readonly)

Returns the value of attribute enabled.



27
28
29
# File 'lib/cloudreactor_wrapper_io/status_updater.rb', line 27

def enabled
  @enabled
end

#portObject (readonly)

Returns the value of attribute port.



28
29
30
# File 'lib/cloudreactor_wrapper_io/status_updater.rb', line 28

def port
  @port
end

Instance Method Details

#closeObject

Close any resources associated with this instance.



152
153
154
155
156
157
# File 'lib/cloudreactor_wrapper_io/status_updater.rb', line 152

def close
  if @socket
    @socket.shutdown
    @socket = nil
  end
end

#send_update(success_count: nil, error_count: nil, skipped_count: nil, expected_count: nil, last_status_message: nil, extra_props: nil) ⇒ Boolean

Sends an update message to the process wrapper. The process wrapper should merge updates together until it is time to send a heartbeat to the server, so it should be safe to call this method rapidly. Messages are not guaranteed to be sent successfully, but in practice they almost always are.

Parameters:

  • success_count (Integer, nil) (defaults to: nil)

    The number of successful items so far

  • error_count (Integer, nil) (defaults to: nil)

    The number of failed items so far

  • skipped_count (Integer, nil) (defaults to: nil)

    The number of skipped items so far

  • expected_count (Integer, nil) (defaults to: nil)

    The number of expected items

  • last_status_message (String, nil) (defaults to: nil)

    A status message

  • extra_props (Hash, nil) (defaults to: nil)

    Additional properties to send

Returns:

  • (Boolean)

    True if the update was sent, false otherwise



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/cloudreactor_wrapper_io/status_updater.rb', line 103

def send_update(success_count: nil, error_count: nil, skipped_count: nil,
  expected_count: nil, last_status_message: nil, extra_props: nil)
  unless @enabled
    return false
  end

  status_hash = {}

  unless success_count.nil?
    status_hash[:success_count] = success_count
  end

  unless error_count.nil?
    status_hash[:error_count] = error_count
  end

  unless skipped_count.nil?
    status_hash[:skipped_count] = skipped_count
  end

  unless expected_count.nil?
    status_hash[:expected_count] = expected_count
  end

  unless last_status_message.nil?
    status_hash[:last_status_message] = last_status_message
  end

  if extra_props
    status_hash.merge!(extra_props)
  end

  if status_hash.empty?
    return false
  end

  message = status_hash.to_json + "\n"

  begin
    socket.send(message, 0, '127.0.0.1', @port)
    true
  rescue => ex
    @logger.debug("Can't send status update, resetting socket")
    @socket = nil
    false
  end
end