Module: Gouda

Defined in:
lib/gouda.rb,
lib/gouda/bulk.rb,
lib/gouda/worker.rb,
lib/gouda/railtie.rb,
lib/gouda/version.rb,
lib/gouda/queue_constraints.rb,
lib/generators/gouda/install_generator.rb,
lib/gouda/active_job_extensions/interrupts.rb,
lib/gouda/active_job_extensions/concurrency.rb

Defined Under Namespace

Modules: ActiveJobExtensions, AnyQueue, BulkAdapterExtension, Scheduler Classes: Adapter, CombinedShutdownCheck, ConcurrencyExceededError, Configuration, EmptyQueueShutdownCheck, ExceptQueueConstraint, InstallGenerator, InterruptError, JobFuse, OnlyQueuesConstraint, Railtie, ThreadSafeSet, TimerShutdownCheck, TrapShutdownCheck, Workload

Constant Summary collapse

POLL_INTERVAL_DURATION_SECONDS =
1
UNINITIALISED_DATABASE_EXCEPTIONS =
[ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid, ActiveRecord::ConnectionNotEstablished]
VERSION =
"0.1.14"

Class Method Summary collapse

Class Method Details

.configObject



56
57
58
# File 'lib/gouda.rb', line 56

def self.config
  @config ||= Configuration.new
end

.configure {|config| ... } ⇒ Object

Yields:



60
61
62
# File 'lib/gouda.rb', line 60

def self.configure
  yield config
end

.create_tables(active_record_schema) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/gouda.rb', line 100

def self.create_tables(active_record_schema)
  active_record_schema.create_enum :gouda_workload_state, %w[enqueued executing finished]
  active_record_schema.create_table :gouda_workloads, id: :uuid do |t|
    t.uuid :active_job_id, null: false
    t.timestamp :scheduled_at, null: false
    t.timestamp :execution_started_at
    t.timestamp :execution_finished_at
    t.timestamp :last_execution_heartbeat_at
    t.timestamp :interrupted_at, null: true

    t.string :scheduler_key, null: true
    t.string :queue_name, null: false, default: "default"
    t.integer :priority
    t.string :active_job_class_name, null: false
    t.jsonb :serialized_params
    t.jsonb :error, default: {}, null: false
    t.enum :state, enum_type: :gouda_workload_state, default: "enqueued", null: false
    t.string :execution_concurrency_key
    t.string :enqueue_concurrency_key
    t.string :executing_on
    t.integer :position_in_bulk

    t.timestamps
  end

  active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index
  active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index
  active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue
  active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule
  active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec
  active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx
  active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx
  active_record_schema.add_index :gouda_workloads, [:last_execution_heartbeat_at], name: :index_gouda_workloads_on_last_execution_heartbeat_at
  active_record_schema.add_index :gouda_workloads, [:scheduler_key], name: :index_gouda_workloads_on_scheduler_key

  active_record_schema.create_table :gouda_job_fuses, id: false do |t|
    t.string :active_job_class_name, null: false

    t.timestamps
  end
end

.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object

This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d



34
35
36
37
38
39
40
41
42
43
# File 'lib/gouda/bulk.rb', line 34

def self.enqueue_jobs_via_their_adapters(active_jobs)
  jobs_per_adapter = active_jobs.compact.group_by { |aj| aj.class.queue_adapter }
  jobs_per_adapter.each_pair do |adapter, active_jobs|
    if adapter.respond_to?(:enqueue_all)
      adapter.enqueue_all(active_jobs)
    else
      active_jobs.each { |aj| adapter.enqueue(aj) }
    end
  end
end

.in_bulk(&blk) ⇒ Object

Inside this call, all ‘perform_later` calls on ActiveJob subclasses (including mailers) will be buffered. The call is reentrant, so you can have multiple `in_bulk` calls with arbitrary nesting. At the end of the block, the buffered jobs will be enqueued using their respective adapters. If an adapter supports `enqueue_all` (Sidekiq does in recent releases of Rails, for example), this functionality will be used. This method is especially useful when doing things such as mass-emails, or maintenance tasks where a large swath of jobs gets enqueued at once.

Examples:

Gouda.in_bulk do
  User.recently_joined.find_each do |recently_joined_user|
    GreetingJob.perform_later(recently_joined_user)
  end
end

Returns:

  • (Object)

    the return value of the block



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/gouda/bulk.rb', line 20

def self.in_bulk(&blk)
  if Thread.current[:gouda_bulk_buffer].nil?
    Thread.current[:gouda_bulk_buffer] = []
    retval = yield
    buf, Thread.current[:gouda_bulk_buffer] = Thread.current[:gouda_bulk_buffer], nil
    enqueue_jobs_via_their_adapters(buf)
    retval
  else # There already is an open bulk
    yield
  end
end

.instrument(channel, options) ⇒ Object



96
97
98
# File 'lib/gouda.rb', line 96

def self.instrument(channel, options, &)
  ActiveSupport::Notifications.instrument("#{channel}.gouda", options, &)
end

.loggerObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/gouda.rb', line 64

def self.logger
  # By default, return a logger that sends data nowhere. The `Rails.logger` method
  # only becomes available later in the Rails lifecycle.
  @fallback_gouda_logger ||= ActiveSupport::Logger.new($stdout).tap do |logger|
    logger.level = Logger::WARN
  end

  # We want the Rails-configured loggers to take precedence over ours, since Gouda
  # is just an ActiveJob adapter and the Workload is just an ActiveRecord, in the end.
  # So it should be up to the developer of the app, not to us, to set the logger up
  # and configure out. There are also gems such as "stackdriver" from Google which
  # rather unceremonously overwrite the Rails logger with their own. If that happens,
  # it is the choice of the user to do so - and we should honor that choice. Same for
  # the logging level - the Rails logger level must take precendence. Same for logger
  # broadcasts which get set up, for example, by the Rails console when you start it.
  Rails.try(:logger) || ActiveJob::Base.try(:logger) || @fallback_gouda_logger
end

.parse_queue_constraint(queue_constraint_str) ⇒ Hash

Parse a string representing a group of queues into a queue constraint Note that this works similar to good_job. For example, the constraints do not necessarily compose all that well.

Examples:

Gouda::QueueConstraints.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }

Parameters:

  • queue_constraint_str (String)

    Queue string

Returns:

  • (Hash)

    How to match a given queue. It can have the following keys and values:

    • { all: true } indicates that all queues match.

    • { exclude: Array<String> } indicates the listed queue names should not match.

    • { include: Array<String> } indicates the listed queue names should match.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/gouda/queue_constraints.rb', line 56

def self.parse_queue_constraint(queue_constraint_str)
  string = queue_constraint_str.presence || "*"

  case string.first
  when "-"
    exclude_queues = true
    string = string[1..]
  when "+"
    string = string[1..]
  end

  queues = string.split(",").map(&:strip)

  if queues.include?("*")
    AnyQueue
  elsif exclude_queues
    ExceptQueueConstraint.new([queues])
  else
    OnlyQueuesConstraint.new([queues])
  end
end

.sleep_with_interruptions(n_seconds, must_abort_proc) ⇒ Object



181
182
183
184
185
186
187
188
189
190
# File 'lib/gouda/worker.rb', line 181

def self.sleep_with_interruptions(n_seconds, must_abort_proc)
  start_time_seconds = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  # remaining_seconds = n_seconds
  check_interval_seconds = Gouda.config.polling_sleep_interval_seconds
  loop do
    return if must_abort_proc.call
    return if Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time_seconds >= n_seconds
    sleep(check_interval_seconds)
  end
end

.startObject



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/gouda.rb', line 43

def self.start
  queue_constraint = if ENV["GOUDA_QUEUES"]
    Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"])
  else
    Gouda::AnyQueue
  end

  logger.info("Gouda version: #{Gouda::VERSION}")
  logger.info("Worker threads: #{Gouda.config.worker_thread_count}")

  worker_loop(n_threads: Gouda.config.worker_thread_count, queue_constraint: queue_constraint)
end

.suppressing_sql_logsObject



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/gouda.rb', line 82

def self.suppressing_sql_logs(&)
  # This is used for frequently-called methods that poll the DB. If logging is done at a low level (DEBUG)
  # those methods print a lot of SQL into the logs, on every poll. While that is useful if
  # you collect SQL queries from the logs, in most cases - especially if this is used
  # in a side-thread inside Puma - the output might be quite annoying. So silence the
  # logger when we poll, but just to INFO. Omitting DEBUG-level messages gets rid of the SQL.
  if Gouda::Workload.logger
    Gouda::Workload.logger.silence(Logger::INFO, &)
  else
    # In tests (and at earlier stages of the Rails boot cycle) the global ActiveRecord logger may be nil
    yield
  end
end

.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) ⇒ Object

Start looping, taking work from the queue and performing it, over multiple worker threads. Once the ‘check_shutdown` callable returns `true` the threads will cleanly terminate and the method will return (so it is blocking).

Parameters:

  • n_threads (Integer)

    how many worker threads to start. Another thread will be started for housekeeping, so ideally this should be the size of your connection pool minus 1

  • check_shutdown (#call) (defaults to: TrapShutdownCheck.new)

    A callable object (can be a Proc etc.). Once starts returning ‘true` the worker threads and the housekeeping thread will cleanly exit



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/gouda/worker.rb', line 117

def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue)
  # We need quite a few things when starting the loop - we have to be far enough into the Rails bootup sequence
  # that both the application and the executor are available
  #
  # raise "Rails is not loaded yet" unless defined?(Rails) && Rails.respond_to?(:application)
  # raise "Rails application is not loaded yet" unless Rails.application
  # raise "Rails executor not available yet" unless Rails.application.executor

  check_shutdown = CombinedShutdownCheck.new(*check_shutdown) if !check_shutdown.respond_to?(:call) && check_shutdown.is_a?(Array)

  worker_id = [Socket.gethostname, Process.pid, SecureRandom.uuid].join("-")

  executing_workload_ids = ThreadSafeSet.new

  raise ArgumentError, "You need at least 1 worker thread, but you requested #{n_threads}" if n_threads < 1
  worker_threads = n_threads.times.map do
    Thread.new do
      worker_id_and_thread_id = [worker_id, "t0x#{Thread.current.object_id.to_s(16)}"].join("-")
      loop do
        break if check_shutdown.call

        did_process = Gouda.config.app_executor.wrap do
          Gouda::Workload.checkout_and_perform_one(executing_on: worker_id_and_thread_id, queue_constraint: queue_constraint, in_progress: executing_workload_ids)
        end

        # If no job was retrieved the queue is likely empty. Relax the polling then and ease off.
        # If a job was retrieved it is likely that a burst has just been enqueued, and we do not
        # sleep but proceed to attempt to retrieve the next job right after.
        jitter_sleep_interval = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS * 0.25)
        sleep_with_interruptions(jitter_sleep_interval, check_shutdown) unless did_process
      rescue => e
        warn "Uncaught exception during perform (#{e.class} - #{e}"
      end
    end
  end

  # Do the housekeeping tasks on main
  loop do
    break if check_shutdown.call

    Gouda.config.app_executor.wrap do
      # Mark known executing jobs as such. If a worker process is killed or the machine it is running on dies,
      # a stale timestamp can indicate to us that the job was orphaned and is marked as "executing"
      # even though the worker it was running on has failed for whatever reason.
      # Later on we can figure out what to do with those jobs (re-enqueue them or toss them)
      Gouda.suppressing_sql_logs do # these updates will also be very frequent with long-running jobs
        Gouda::Workload.where(id: executing_workload_ids.to_a, state: "executing").update_all(executing_on: worker_id, last_execution_heartbeat_at: Time.now.utc)
      end

      # Find jobs which just hung and clean them up (mark them as "finished" and enqueue replacement workloads if possible)
      Gouda::Workload.reap_zombie_workloads
    rescue => e
      Gouda.instrument(:exception, {exception: e})
      warn "Uncaught exception during housekeeping (#{e.class} - #{e}"
    end

    # Jitter the sleep so that the workers booted at the same time do not all dogpile
    randomized_sleep_duration_s = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS.to_f * rand)
    sleep_with_interruptions(randomized_sleep_duration_s, check_shutdown)
  end
ensure
  worker_threads&.map(&:join)
end