Class: Exekutor::Internal::Listener
- Inherits:
-
Object
- Object
- Exekutor::Internal::Listener
- Includes:
- Executable, Logger
- Defined in:
- lib/exekutor/internal/listener.rb
Overview
Listens for jobs to be executed
Defined Under Namespace
Classes: Error, JobParser, UnsupportedDatabase
Constant Summary collapse
- JOB_ENQUEUED_CHANNEL =
The PG notification channel for enqueued jobs
"exekutor::job_enqueued"
- PROVIDER_CHANNEL =
The PG notification channel for a worker. Must be formatted with the worker ID.
"exekutor::worker::%s"
Constants included from Executable
Instance Method Summary collapse
-
#initialize(worker_id:, provider:, pool:, queues: nil, min_priority: nil, max_priority: nil, wait_timeout: 60, set_db_connection_name: false) ⇒ Listener
constructor
Creates a new listener.
-
#start ⇒ Object
Starts the listener.
-
#stop ⇒ Object
Stops the listener.
Methods included from Executable
#consecutive_errors, #restart_delay, #running?, #state
Constructor Details
#initialize(worker_id:, provider:, pool:, queues: nil, min_priority: nil, max_priority: nil, wait_timeout: 60, set_db_connection_name: false) ⇒ Listener
Creates a new listener
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/exekutor/internal/listener.rb', line 27 def initialize(worker_id:, provider:, pool:, queues: nil, min_priority: nil, max_priority: nil, wait_timeout: 60, set_db_connection_name: false) super() @config = { worker_id: worker_id, queues: queues.presence, min_priority: min_priority, max_priority: max_priority, wait_timeout: wait_timeout, set_db_connection_name: set_db_connection_name } @provider = provider @pool = pool @thread_running = Concurrent::AtomicBoolean.new false @listening = Concurrent::AtomicBoolean.new false end |
Instance Method Details
#start ⇒ Object
Starts the listener
49 50 51 52 53 54 |
# File 'lib/exekutor/internal/listener.rb', line 49 def start return false unless compare_and_set_state :pending, :started start_thread true end |
#stop ⇒ Object
Stops the listener
57 58 59 60 61 62 63 64 |
# File 'lib/exekutor/internal/listener.rb', line 57 def stop self.state = :stopped begin Exekutor::Job.connection.execute(%(NOTIFY "#{provider_channel}")) rescue ActiveRecord::StatementInvalid, ActiveRecord::ConnectionNotEstablished # ignored end end |