Class: Exekutor::Internal::Listener

Inherits:
Object
  • Object
show all
Includes:
Executable, Logger
Defined in:
lib/exekutor/internal/listener.rb

Overview

Listens for jobs to be executed

Defined Under Namespace

Classes: Error, JobParser, UnsupportedDatabase

Constant Summary collapse

JOB_ENQUEUED_CHANNEL =

The PG notification channel for enqueued jobs

"exekutor::job_enqueued"
PROVIDER_CHANNEL =

The PG notification channel for a worker. Must be formatted with the worker ID.

"exekutor::worker::%s"

Constants included from Executable

Executable::STATES

Instance Method Summary collapse

Methods included from Executable

#consecutive_errors, #restart_delay, #running?, #state

Constructor Details

#initialize(worker_id:, provider:, pool:, queues: nil, min_priority: nil, max_priority: nil, wait_timeout: 60, set_db_connection_name: false) ⇒ Listener

Creates a new listener

Parameters:

  • worker_id (String)

    the ID of the worker

  • queues (Array<String>) (defaults to: nil)

    the queues to watch

  • provider (Provider)

    the job provider

  • pool (ThreadPoolExecutor)

    the thread pool to use

  • wait_timeout (Integer) (defaults to: 60)

    the time to listen for notifications

  • set_db_connection_name (Boolean) (defaults to: false)

    whether to set the application name on the DB connection



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/exekutor/internal/listener.rb', line 27

def initialize(worker_id:, provider:, pool:, queues: nil, min_priority: nil, max_priority: nil, wait_timeout: 60,
               set_db_connection_name: false)
  super()
  @config = {
    worker_id: worker_id,
    queues: queues.presence,
    min_priority: min_priority,
    max_priority: max_priority,
    wait_timeout: wait_timeout,
    set_db_connection_name: set_db_connection_name
  }

  @provider = provider
  @pool = pool

  @thread_running = Concurrent::AtomicBoolean.new false
  @listening = Concurrent::AtomicBoolean.new false
end

Instance Method Details

#startObject

Starts the listener



49
50
51
52
53
54
# File 'lib/exekutor/internal/listener.rb', line 49

def start
  return false unless compare_and_set_state :pending, :started

  start_thread
  true
end

#stopObject

Stops the listener



57
58
59
60
61
62
63
64
# File 'lib/exekutor/internal/listener.rb', line 57

def stop
  self.state = :stopped
  begin
    Exekutor::Job.connection.execute(%(NOTIFY "#{provider_channel}"))
  rescue ActiveRecord::StatementInvalid, ActiveRecord::ConnectionNotEstablished
    # ignored
  end
end