Module: Gouda
- Defined in:
- lib/gouda.rb,
lib/gouda/bulk.rb,
lib/gouda/worker.rb,
lib/gouda/railtie.rb,
lib/gouda/version.rb,
lib/gouda/queue_constraints.rb,
lib/generators/gouda/install_generator.rb,
lib/gouda/active_job_extensions/interrupts.rb,
lib/gouda/active_job_extensions/concurrency.rb
Defined Under Namespace
Modules: ActiveJobExtensions, AnyQueue, BulkAdapterExtension, Scheduler Classes: Adapter, CombinedShutdownCheck, ConcurrencyExceededError, Configuration, EmptyQueueShutdownCheck, ExceptQueueConstraint, InstallGenerator, InterruptError, JobFuse, OnlyQueuesConstraint, Railtie, ThreadSafeSet, TimerShutdownCheck, TrapShutdownCheck, Workload
Constant Summary collapse
- POLL_INTERVAL_DURATION_SECONDS =
1
- UNINITIALISED_DATABASE_EXCEPTIONS =
[ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid, ActiveRecord::ConnectionNotEstablished]
- VERSION =
"0.1.14"
Class Method Summary collapse
- .config ⇒ Object
- .configure {|config| ... } ⇒ Object
- .create_tables(active_record_schema) ⇒ Object
-
.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object
This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d.
-
.in_bulk(&blk) ⇒ Object
Inside this call, all ‘perform_later` calls on ActiveJob subclasses (including mailers) will be buffered.
- .instrument(channel, options) ⇒ Object
- .logger ⇒ Object
-
.parse_queue_constraint(queue_constraint_str) ⇒ Hash
Parse a string representing a group of queues into a queue constraint Note that this works similar to good_job.
- .sleep_with_interruptions(n_seconds, must_abort_proc) ⇒ Object
- .start ⇒ Object
- .suppressing_sql_logs ⇒ Object
-
.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) ⇒ Object
Start looping, taking work from the queue and performing it, over multiple worker threads.
Class Method Details
.config ⇒ Object
56 57 58 |
# File 'lib/gouda.rb', line 56 def self.config @config ||= Configuration.new end |
.configure {|config| ... } ⇒ Object
60 61 62 |
# File 'lib/gouda.rb', line 60 def self.configure yield config end |
.create_tables(active_record_schema) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/gouda.rb', line 100 def self.create_tables(active_record_schema) active_record_schema.create_enum :gouda_workload_state, %w[enqueued executing finished] active_record_schema.create_table :gouda_workloads, id: :uuid do |t| t.uuid :active_job_id, null: false t. :scheduled_at, null: false t. :execution_started_at t. :execution_finished_at t. :last_execution_heartbeat_at t. :interrupted_at, null: true t.string :scheduler_key, null: true t.string :queue_name, null: false, default: "default" t.integer :priority t.string :active_job_class_name, null: false t.jsonb :serialized_params t.jsonb :error, default: {}, null: false t.enum :state, enum_type: :gouda_workload_state, default: "enqueued", null: false t.string :execution_concurrency_key t.string :enqueue_concurrency_key t.string :executing_on t.integer :position_in_bulk t. end active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx active_record_schema.add_index :gouda_workloads, [:last_execution_heartbeat_at], name: :index_gouda_workloads_on_last_execution_heartbeat_at active_record_schema.add_index :gouda_workloads, [:scheduler_key], name: :index_gouda_workloads_on_scheduler_key active_record_schema.create_table :gouda_job_fuses, id: false do |t| t.string :active_job_class_name, null: false t. end end |
.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object
This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/gouda/bulk.rb', line 34 def self.enqueue_jobs_via_their_adapters(active_jobs) jobs_per_adapter = active_jobs.compact.group_by { |aj| aj.class.queue_adapter } jobs_per_adapter.each_pair do |adapter, active_jobs| if adapter.respond_to?(:enqueue_all) adapter.enqueue_all(active_jobs) else active_jobs.each { |aj| adapter.enqueue(aj) } end end end |
.in_bulk(&blk) ⇒ Object
Inside this call, all ‘perform_later` calls on ActiveJob subclasses (including mailers) will be buffered. The call is reentrant, so you can have multiple `in_bulk` calls with arbitrary nesting. At the end of the block, the buffered jobs will be enqueued using their respective adapters. If an adapter supports `enqueue_all` (Sidekiq does in recent releases of Rails, for example), this functionality will be used. This method is especially useful when doing things such as mass-emails, or maintenance tasks where a large swath of jobs gets enqueued at once.
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/gouda/bulk.rb', line 20 def self.in_bulk(&blk) if Thread.current[:gouda_bulk_buffer].nil? Thread.current[:gouda_bulk_buffer] = [] retval = yield buf, Thread.current[:gouda_bulk_buffer] = Thread.current[:gouda_bulk_buffer], nil enqueue_jobs_via_their_adapters(buf) retval else # There already is an open bulk yield end end |
.instrument(channel, options) ⇒ Object
96 97 98 |
# File 'lib/gouda.rb', line 96 def self.instrument(channel, , &) ActiveSupport::Notifications.instrument("#{channel}.gouda", , &) end |
.logger ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/gouda.rb', line 64 def self.logger # By default, return a logger that sends data nowhere. The `Rails.logger` method # only becomes available later in the Rails lifecycle. @fallback_gouda_logger ||= ActiveSupport::Logger.new($stdout).tap do |logger| logger.level = Logger::WARN end # We want the Rails-configured loggers to take precedence over ours, since Gouda # is just an ActiveJob adapter and the Workload is just an ActiveRecord, in the end. # So it should be up to the developer of the app, not to us, to set the logger up # and configure out. There are also gems such as "stackdriver" from Google which # rather unceremonously overwrite the Rails logger with their own. If that happens, # it is the choice of the user to do so - and we should honor that choice. Same for # the logging level - the Rails logger level must take precendence. Same for logger # broadcasts which get set up, for example, by the Rails console when you start it. Rails.try(:logger) || ActiveJob::Base.try(:logger) || @fallback_gouda_logger end |
.parse_queue_constraint(queue_constraint_str) ⇒ Hash
Parse a string representing a group of queues into a queue constraint Note that this works similar to good_job. For example, the constraints do not necessarily compose all that well.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/gouda/queue_constraints.rb', line 56 def self.parse_queue_constraint(queue_constraint_str) string = queue_constraint_str.presence || "*" case string.first when "-" exclude_queues = true string = string[1..] when "+" string = string[1..] end queues = string.split(",").map(&:strip) if queues.include?("*") AnyQueue elsif exclude_queues ExceptQueueConstraint.new([queues]) else OnlyQueuesConstraint.new([queues]) end end |
.sleep_with_interruptions(n_seconds, must_abort_proc) ⇒ Object
181 182 183 184 185 186 187 188 189 190 |
# File 'lib/gouda/worker.rb', line 181 def self.sleep_with_interruptions(n_seconds, must_abort_proc) start_time_seconds = Process.clock_gettime(Process::CLOCK_MONOTONIC) # remaining_seconds = n_seconds check_interval_seconds = Gouda.config.polling_sleep_interval_seconds loop do return if must_abort_proc.call return if Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time_seconds >= n_seconds sleep(check_interval_seconds) end end |
.start ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/gouda.rb', line 43 def self.start queue_constraint = if ENV["GOUDA_QUEUES"] Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"]) else Gouda::AnyQueue end logger.info("Gouda version: #{Gouda::VERSION}") logger.info("Worker threads: #{Gouda.config.worker_thread_count}") worker_loop(n_threads: Gouda.config.worker_thread_count, queue_constraint: queue_constraint) end |
.suppressing_sql_logs ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/gouda.rb', line 82 def self.suppressing_sql_logs(&) # This is used for frequently-called methods that poll the DB. If logging is done at a low level (DEBUG) # those methods print a lot of SQL into the logs, on every poll. While that is useful if # you collect SQL queries from the logs, in most cases - especially if this is used # in a side-thread inside Puma - the output might be quite annoying. So silence the # logger when we poll, but just to INFO. Omitting DEBUG-level messages gets rid of the SQL. if Gouda::Workload.logger Gouda::Workload.logger.silence(Logger::INFO, &) else # In tests (and at earlier stages of the Rails boot cycle) the global ActiveRecord logger may be nil yield end end |
.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) ⇒ Object
Start looping, taking work from the queue and performing it, over multiple worker threads. Once the ‘check_shutdown` callable returns `true` the threads will cleanly terminate and the method will return (so it is blocking).
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/gouda/worker.rb', line 117 def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) # We need quite a few things when starting the loop - we have to be far enough into the Rails bootup sequence # that both the application and the executor are available # # raise "Rails is not loaded yet" unless defined?(Rails) && Rails.respond_to?(:application) # raise "Rails application is not loaded yet" unless Rails.application # raise "Rails executor not available yet" unless Rails.application.executor check_shutdown = CombinedShutdownCheck.new(*check_shutdown) if !check_shutdown.respond_to?(:call) && check_shutdown.is_a?(Array) worker_id = [Socket.gethostname, Process.pid, SecureRandom.uuid].join("-") executing_workload_ids = ThreadSafeSet.new raise ArgumentError, "You need at least 1 worker thread, but you requested #{n_threads}" if n_threads < 1 worker_threads = n_threads.times.map do Thread.new do worker_id_and_thread_id = [worker_id, "t0x#{Thread.current.object_id.to_s(16)}"].join("-") loop do break if check_shutdown.call did_process = Gouda.config.app_executor.wrap do Gouda::Workload.checkout_and_perform_one(executing_on: worker_id_and_thread_id, queue_constraint: queue_constraint, in_progress: executing_workload_ids) end # If no job was retrieved the queue is likely empty. Relax the polling then and ease off. # If a job was retrieved it is likely that a burst has just been enqueued, and we do not # sleep but proceed to attempt to retrieve the next job right after. jitter_sleep_interval = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS * 0.25) sleep_with_interruptions(jitter_sleep_interval, check_shutdown) unless did_process rescue => e warn "Uncaught exception during perform (#{e.class} - #{e}" end end end # Do the housekeeping tasks on main loop do break if check_shutdown.call Gouda.config.app_executor.wrap do # Mark known executing jobs as such. If a worker process is killed or the machine it is running on dies, # a stale timestamp can indicate to us that the job was orphaned and is marked as "executing" # even though the worker it was running on has failed for whatever reason. # Later on we can figure out what to do with those jobs (re-enqueue them or toss them) Gouda.suppressing_sql_logs do # these updates will also be very frequent with long-running jobs Gouda::Workload.where(id: executing_workload_ids.to_a, state: "executing").update_all(executing_on: worker_id, last_execution_heartbeat_at: Time.now.utc) end # Find jobs which just hung and clean them up (mark them as "finished" and enqueue replacement workloads if possible) Gouda::Workload.reap_zombie_workloads rescue => e Gouda.instrument(:exception, {exception: e}) warn "Uncaught exception during housekeeping (#{e.class} - #{e}" end # Jitter the sleep so that the workers booted at the same time do not all dogpile randomized_sleep_duration_s = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS.to_f * rand) sleep_with_interruptions(randomized_sleep_duration_s, check_shutdown) end ensure worker_threads&.map(&:join) end |