Class: GoodJob::Notifier

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Callbacks, ProcessHeartbeat
Defined in:
lib/good_job/notifier.rb,
lib/good_job/notifier/process_heartbeat.rb

Overview

:nodoc:

Defined Under Namespace

Modules: ProcessHeartbeat

Constant Summary collapse

CHANNEL =

Default Postgres channel for LISTEN/NOTIFY

'good_job'
WAIT_INTERVAL =

Seconds to block while LISTENing for a message

1
RECONNECT_INTERVAL =

Seconds to wait if database cannot be connected to

5
CONNECTION_ERRORS_REPORTING_THRESHOLD =

Number of consecutive connection errors before reporting an error

6
KEEPALIVE_INTERVAL =

Interval for emitting a noop SQL query to keep the connection alive

10
CONNECTION_ERRORS =

Connection errors that will wait RECONNECT_INTERVAL before reconnecting

%w[
  ActiveRecord::ConnectionNotEstablished
  ActiveRecord::StatementInvalid
  PG::UnableToSend
  PG::Error
].freeze

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProcessHeartbeat

#deregister_process, #refresh_process, #register_process

Constructor Details

#initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) ⇒ Notifier

Returns a new instance of Notifier.

Parameters:

  • recipients (Array<#call, Array(Object, Symbol)>)
  • enable_listening (true, false) (defaults to: true)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/good_job/notifier.rb', line 68

def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor)
  @recipients = Concurrent::Array.new(recipients)
  @enable_listening = enable_listening
  @executor = executor

  @monitor = Monitor.new
  @shutdown_event = Concurrent::Event.new.tap(&:set)
  @running = Concurrent::AtomicBoolean.new(false)
  @connected = Concurrent::Event.new
  @listening = Concurrent::Event.new
  @connection_errors_count = Concurrent::AtomicFixnum.new(0)
  @connection_errors_reported = Concurrent::AtomicBoolean.new(false)
  @enable_listening = enable_listening
  @task = nil
  @capsule = capsule
  @last_keepalive_time = Time.current

  start
  self.class.instances << self
end

Class Attribute Details

.connectionActiveRecord::ConnectionAdapters::AbstractAdapter?

ActiveRecord Connection that has been established for the Notifier.

Returns:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter, nil)


51
# File 'lib/good_job/notifier.rb', line 51

thread_cattr_accessor :connection

.instancesArray<GoodJob::Notifier>? (readonly)

List of all instantiated Notifiers in the current process.

Returns:



45
# File 'lib/good_job/notifier.rb', line 45

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Attribute Details

#recipientsArray<#call, Array(Object, Symbol)> (readonly)

List of recipients that will receive notifications.

Returns:

  • (Array<#call, Array(Object, Symbol)>)


64
65
66
# File 'lib/good_job/notifier.rb', line 64

def recipients
  @recipients
end

Class Method Details

.notify(message) ⇒ Object

Send a message via Postgres NOTIFY

Parameters:

  • message (#to_json)


55
56
57
58
59
60
# File 'lib/good_job/notifier.rb', line 55

def self.notify(message)
  connection = ::GoodJob::Job.connection
  connection.exec_query <<~SQL.squish
    NOTIFY #{CHANNEL}, #{connection.quote(message.to_json)}
  SQL
end

Instance Method Details

#connected?(timeout: nil) ⇒ true, ...

Tests whether the notifier is active and has acquired a dedicated database connection.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    Seconds to wait for condition to be true, -1 is forever

Returns:

  • (true, false, nil)


96
97
98
99
100
101
102
# File 'lib/good_job/notifier.rb', line 96

def connected?(timeout: nil)
  if timeout.nil?
    @connected.set?
  else
    @connected.wait(timeout&.negative? ? nil : timeout)
  end
end

#listening?(timeout: nil) ⇒ true, ...

Tests whether the notifier is listening for new messages.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    Seconds to wait for condition to be true, -1 is forever

Returns:

  • (true, false, nil)


107
108
109
110
111
112
113
# File 'lib/good_job/notifier.rb', line 107

def listening?(timeout: nil)
  if timeout.nil?
    @listening.set?
  else
    @listening.wait(timeout&.negative? ? nil : timeout)
  end
end

#restart(timeout: -1)) ⇒ void

This method returns an undefined value.

Restart the notifier. When shutdown, start; or shutdown and start.

Parameters:

  • timeout (nil, Numeric) (defaults to: -1))

    Seconds to wait; shares same values as #shutdown.



148
149
150
151
152
153
# File 'lib/good_job/notifier.rb', line 148

def restart(timeout: -1)
  synchronize do
    shutdown(timeout: timeout) unless @shutdown_event.set?
    start
  end
end

#running?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/good_job/notifier.rb', line 89

def running?
  @executor.running? && @running.true?
end

#shutdown(timeout: -1)) ⇒ void

This method returns an undefined value.

Shut down the notifier. This stops the background LISTENing thread. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))

    Seconds to wait for active threads.

    • nil, the scheduler will trigger a shutdown but not wait for it to complete.

    • -1, the scheduler will wait until the shutdown is complete.

    • A positive number will wait that many seconds before stopping any remaining active threads.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/good_job/notifier.rb', line 127

def shutdown(timeout: -1)
  synchronize do
    @running.make_false

    if @executor.shutdown? || @task&.complete?
      # clean up in the even the executor is killed
      @connected.reset
      @listening.reset
      @shutdown_event.set
    else
      @shutdown_event.wait(timeout&.negative? ? nil : timeout) unless timeout.nil?
      @connected.reset if @shutdown_event.set?
    end
    @shutdown_event.set?
  end
end

#shutdown?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/good_job/notifier.rb', line 115

def shutdown?
  @shutdown_event.set?
end