Class: GoodJob::Notifier
- Inherits:
-
Object
- Object
- GoodJob::Notifier
- Includes:
- ActiveSupport::Callbacks, ProcessHeartbeat
- Defined in:
- lib/good_job/notifier.rb,
lib/good_job/notifier/process_heartbeat.rb
Overview
:nodoc:
Defined Under Namespace
Modules: ProcessHeartbeat
Constant Summary collapse
- CHANNEL =
Default Postgres channel for LISTEN/NOTIFY
'good_job'
- WAIT_INTERVAL =
Seconds to block while LISTENing for a message
1
- RECONNECT_INTERVAL =
Seconds to wait if database cannot be connected to
5
- CONNECTION_ERRORS_REPORTING_THRESHOLD =
Number of consecutive connection errors before reporting an error
6
- KEEPALIVE_INTERVAL =
Interval for emitting a noop SQL query to keep the connection alive
10
- CONNECTION_ERRORS =
Connection errors that will wait RECONNECT_INTERVAL before reconnecting
%w[ ActiveRecord::ConnectionNotEstablished ActiveRecord::StatementInvalid PG::UnableToSend PG::Error ].freeze
Class Attribute Summary collapse
-
.connection ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter?
ActiveRecord Connection that has been established for the Notifier.
-
.instances ⇒ Array<GoodJob::Notifier>?
readonly
List of all instantiated Notifiers in the current process.
Instance Attribute Summary collapse
-
#recipients ⇒ Array<#call, Array(Object, Symbol)>
readonly
List of recipients that will receive notifications.
Class Method Summary collapse
-
.notify(message) ⇒ Object
Send a message via Postgres NOTIFY.
Instance Method Summary collapse
-
#connected?(timeout: nil) ⇒ true, ...
Tests whether the notifier is active and has acquired a dedicated database connection.
-
#initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) ⇒ Notifier
constructor
A new instance of Notifier.
-
#listening?(timeout: nil) ⇒ true, ...
Tests whether the notifier is listening for new messages.
-
#restart(timeout: -1)) ⇒ void
Restart the notifier.
- #running? ⇒ Boolean
-
#shutdown(timeout: -1)) ⇒ void
Shut down the notifier.
- #shutdown? ⇒ Boolean
Methods included from ProcessHeartbeat
#deregister_process, #refresh_process, #register_process
Constructor Details
#initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) ⇒ Notifier
Returns a new instance of Notifier.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/good_job/notifier.rb', line 68 def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) @recipients = Concurrent::Array.new(recipients) @enable_listening = enable_listening @executor = executor @monitor = Monitor.new @shutdown_event = Concurrent::Event.new.tap(&:set) @running = Concurrent::AtomicBoolean.new(false) @connected = Concurrent::Event.new @listening = Concurrent::Event.new @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @task = nil @capsule = capsule @last_keepalive_time = Time.current start self.class.instances << self end |
Class Attribute Details
.connection ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter?
ActiveRecord Connection that has been established for the Notifier.
51 |
# File 'lib/good_job/notifier.rb', line 51 thread_cattr_accessor :connection |
.instances ⇒ Array<GoodJob::Notifier>? (readonly)
List of all instantiated Notifiers in the current process.
45 |
# File 'lib/good_job/notifier.rb', line 45 cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false |
Instance Attribute Details
#recipients ⇒ Array<#call, Array(Object, Symbol)> (readonly)
List of recipients that will receive notifications.
64 65 66 |
# File 'lib/good_job/notifier.rb', line 64 def recipients @recipients end |
Class Method Details
Instance Method Details
#connected?(timeout: nil) ⇒ true, ...
Tests whether the notifier is active and has acquired a dedicated database connection.
96 97 98 99 100 101 102 |
# File 'lib/good_job/notifier.rb', line 96 def connected?(timeout: nil) if timeout.nil? @connected.set? else @connected.wait(timeout&.negative? ? nil : timeout) end end |
#listening?(timeout: nil) ⇒ true, ...
Tests whether the notifier is listening for new messages.
107 108 109 110 111 112 113 |
# File 'lib/good_job/notifier.rb', line 107 def listening?(timeout: nil) if timeout.nil? @listening.set? else @listening.wait(timeout&.negative? ? nil : timeout) end end |
#restart(timeout: -1)) ⇒ void
This method returns an undefined value.
Restart the notifier. When shutdown, start; or shutdown and start.
148 149 150 151 152 153 |
# File 'lib/good_job/notifier.rb', line 148 def restart(timeout: -1) synchronize do shutdown(timeout: timeout) unless @shutdown_event.set? start end end |
#running? ⇒ Boolean
89 90 91 |
# File 'lib/good_job/notifier.rb', line 89 def running? @executor.running? && @running.true? end |
#shutdown(timeout: -1)) ⇒ void
This method returns an undefined value.
Shut down the notifier. This stops the background LISTENing thread. Use #shutdown? to determine whether threads have stopped.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/good_job/notifier.rb', line 127 def shutdown(timeout: -1) synchronize do @running.make_false if @executor.shutdown? || @task&.complete? # clean up in the even the executor is killed @connected.reset @listening.reset @shutdown_event.set else @shutdown_event.wait(timeout&.negative? ? nil : timeout) unless timeout.nil? @connected.reset if @shutdown_event.set? end @shutdown_event.set? end end |
#shutdown? ⇒ Boolean
115 116 117 |
# File 'lib/good_job/notifier.rb', line 115 def shutdown? @shutdown_event.set? end |