Class: GoodJob::Notifier
- Inherits:
-
Object
- Object
- GoodJob::Notifier
- Includes:
- ActiveSupport::Callbacks, ProcessHeartbeat
- Defined in:
- lib/good_job/notifier.rb,
lib/good_job/notifier/process_heartbeat.rb
Overview
:nodoc:
Defined Under Namespace
Modules: ProcessHeartbeat
Constant Summary collapse
- CHANNEL =
Default Postgres channel for LISTEN/NOTIFY
'good_job'- WAIT_INTERVAL =
Seconds to block while LISTENing for a message
1- RECONNECT_INTERVAL =
Seconds to wait if database cannot be connected to
5- CONNECTION_ERRORS_REPORTING_THRESHOLD =
Number of consecutive connection errors before reporting an error
6- KEEPALIVE_INTERVAL =
Interval for emitting a noop SQL query to keep the connection alive
10- CONNECTION_ERRORS =
Connection errors that will wait RECONNECT_INTERVAL before reconnecting
%w[ ActiveRecord::ConnectionNotEstablished ActiveRecord::StatementInvalid PG::UnableToSend PG::Error ].freeze
Class Attribute Summary collapse
-
.connection ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter?
ActiveRecord Connection that has been established for the Notifier.
-
.instances ⇒ Array<GoodJob::Notifier>?
readonly
List of all instantiated Notifiers in the current process.
Instance Attribute Summary collapse
-
#recipients ⇒ Array<#call, Array(Object, Symbol)>
readonly
List of recipients that will receive notifications.
Class Method Summary collapse
-
.notify(message) ⇒ Object
Send a message via Postgres NOTIFY.
Instance Method Summary collapse
-
#connected?(timeout: nil) ⇒ true, ...
Tests whether the notifier is active and has acquired a dedicated database connection.
-
#initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) ⇒ Notifier
constructor
A new instance of Notifier.
-
#listening?(timeout: nil) ⇒ true, ...
Tests whether the notifier is listening for new messages.
-
#restart(timeout: -1)) ⇒ void
Restart the notifier.
- #running? ⇒ Boolean
-
#shutdown(timeout: -1)) ⇒ void
Shut down the notifier.
- #shutdown? ⇒ Boolean
Methods included from ProcessHeartbeat
#deregister_process, #refresh_process, #register_process
Constructor Details
#initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) ⇒ Notifier
Returns a new instance of Notifier.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/good_job/notifier.rb', line 69 def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) @recipients = Concurrent::Array.new(recipients) @enable_listening = enable_listening @executor = executor @monitor = Monitor.new @shutdown_event = Concurrent::Event.new.tap(&:set) @running = Concurrent::AtomicBoolean.new(false) @connected = Concurrent::Event.new @listening = Concurrent::Event.new @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @task = nil @capsule = capsule @last_keepalive_time = Time.current start self.class.instances << self end |
Class Attribute Details
.connection ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter?
ActiveRecord Connection that has been established for the Notifier.
52 |
# File 'lib/good_job/notifier.rb', line 52 thread_cattr_accessor :connection |
.instances ⇒ Array<GoodJob::Notifier>? (readonly)
List of all instantiated Notifiers in the current process.
46 |
# File 'lib/good_job/notifier.rb', line 46 cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false |
Instance Attribute Details
#recipients ⇒ Array<#call, Array(Object, Symbol)> (readonly)
List of recipients that will receive notifications.
65 66 67 |
# File 'lib/good_job/notifier.rb', line 65 def recipients @recipients end |
Class Method Details
.notify(message) ⇒ Object
Send a message via Postgres NOTIFY
56 57 58 59 60 61 |
# File 'lib/good_job/notifier.rb', line 56 def self.notify() connection = ::GoodJob::Job.connection connection.exec_query " NOTIFY \#{CHANNEL}, \#{connection.quote(message.to_json)}\n SQL\nend\n".squish |
Instance Method Details
#connected?(timeout: nil) ⇒ true, ...
Tests whether the notifier is active and has acquired a dedicated database connection.
97 98 99 100 101 102 103 |
# File 'lib/good_job/notifier.rb', line 97 def connected?(timeout: nil) if timeout.nil? @connected.set? else @connected.wait(timeout&.negative? ? nil : timeout) end end |
#listening?(timeout: nil) ⇒ true, ...
Tests whether the notifier is listening for new messages.
108 109 110 111 112 113 114 |
# File 'lib/good_job/notifier.rb', line 108 def listening?(timeout: nil) if timeout.nil? @listening.set? else @listening.wait(timeout&.negative? ? nil : timeout) end end |
#restart(timeout: -1)) ⇒ void
This method returns an undefined value.
Restart the notifier. When shutdown, start; or shutdown and start.
149 150 151 152 153 154 |
# File 'lib/good_job/notifier.rb', line 149 def restart(timeout: -1) synchronize do shutdown(timeout: timeout) unless @shutdown_event.set? start end end |
#running? ⇒ Boolean
90 91 92 |
# File 'lib/good_job/notifier.rb', line 90 def running? @executor.running? && @running.true? end |
#shutdown(timeout: -1)) ⇒ void
This method returns an undefined value.
Shut down the notifier. This stops the background LISTENing thread. Use #shutdown? to determine whether threads have stopped.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/good_job/notifier.rb', line 128 def shutdown(timeout: -1) synchronize do @running.make_false if @executor.shutdown? || @task&.complete? # clean up in the even the executor is killed @connected.reset @listening.reset @shutdown_event.set else @shutdown_event.wait(timeout&.negative? ? nil : timeout) unless timeout.nil? @connected.reset if @shutdown_event.set? end @shutdown_event.set? end end |
#shutdown? ⇒ Boolean
116 117 118 |
# File 'lib/good_job/notifier.rb', line 116 def shutdown? @shutdown_event.set? end |