Class: GoodJob::Notifier

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/notifier.rb

Overview

Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.

Notifiers can emit NOTIFY messages through Postgres. A notifier will LISTEN for messages by creating a background thread that runs in an instance of Concurrent::ThreadPoolExecutor. When a message is received, the notifier passes the message to each of its recipients.

Constant Summary collapse

CHANNEL =

Default Postgres channel for LISTEN/NOTIFY

'good_job'.freeze
POOL_OPTIONS =

Defaults for instance of Concurrent::ThreadPoolExecutor

{
  name: name,
  min_threads: 0,
  max_threads: 1,
  auto_terminate: true,
  idletime: 60,
  max_queue: 1,
  fallback_policy: :discard,
}.freeze
WAIT_INTERVAL =

Seconds to block while LISTENing for a message

1

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*recipients) ⇒ Notifier

Returns a new instance of Notifier.

Parameters:

  • recipients (Array<#call, Array(Object, Symbol)>)


47
48
49
50
51
52
53
54
55
# File 'lib/good_job/notifier.rb', line 47

def initialize(*recipients)
  @recipients = Concurrent::Array.new(recipients)
  @listening = Concurrent::AtomicBoolean.new(false)

  self.class.instances << self

  create_pool
  listen
end

Class Attribute Details

.instancesarray<GoodJob:Adapter> (readonly)

List of all instantiated Notifiers in the current process.

Returns:

  • (array<GoodJob:Adapter>)


31
# File 'lib/good_job/notifier.rb', line 31

cattr_reader :instances, default: [], instance_reader: false

Instance Attribute Details

#recipientsArray<#call, Array(Object, Symbol)> (readonly)

List of recipients that will receive notifications.

Returns:

  • (Array<#call, Array(Object, Symbol)>)


44
45
46
# File 'lib/good_job/notifier.rb', line 44

def recipients
  @recipients
end

Class Method Details

.notify(message) ⇒ Object

Send a message via Postgres NOTIFY

Parameters:

  • message (#to_json)


35
36
37
38
39
40
# File 'lib/good_job/notifier.rb', line 35

def self.notify(message)
  connection = ActiveRecord::Base.connection
  connection.exec_query <<~SQL.squish
    NOTIFY #{CHANNEL}, #{connection.quote(message.to_json)}
  SQL
end

Instance Method Details

#listening?true, ...

Tests whether the notifier is active and listening for new messages.

Returns:

  • (true, false, nil)


59
60
61
# File 'lib/good_job/notifier.rb', line 59

def listening?
  @listening.true?
end

#restart(wait: true) ⇒ void

This method returns an undefined value.

Restart the notifier. When shutdown, start; or shutdown and start.

Parameters:

  • wait (Boolean) (defaults to: true)

    Wait for background thread to finish



67
68
69
70
71
# File 'lib/good_job/notifier.rb', line 67

def restart(wait: true)
  shutdown(wait: wait)
  create_pool
  listen
end

#shutdown(wait: true) ⇒ void

This method returns an undefined value.

Shut down the notifier. This stops the background LISTENing thread. If wait is true, the notifier will wait for background thread to shutdown. If wait is false, this method will return immediately even though threads may still be running. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • wait (Boolean) (defaults to: true)

    Wait for actively executing threads to finish



80
81
82
83
84
85
# File 'lib/good_job/notifier.rb', line 80

def shutdown(wait: true)
  return unless @pool.running?

  @pool.shutdown
  @pool.wait_for_termination if wait
end

#shutdown?true, ...

Tests whether the notifier is shutdown.

Returns:

  • (true, false, nil)


89
90
91
# File 'lib/good_job/notifier.rb', line 89

def shutdown?
  !@pool.running?
end