Class: Delayed::Backend::ActiveRecord::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Backend::ActiveRecord::Job
- Includes:
- Base
- Defined in:
- lib/delayed/backend/active_record.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have synchronized clocks.
- .default_timezone ⇒ Object
- .ready_to_run(worker_name, max_run_time) ⇒ Object
- .reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
- .reserve_with_scope(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) ⇒ Object
- .reserve_with_scope_using_optimized_sql(ready_scope, worker, now) ⇒ Object
- .set_delayed_job_table_name ⇒ Object
Instance Method Summary collapse
Class Method Details
.after_fork ⇒ Object
72 73 74 |
# File 'lib/delayed/backend/active_record.rb', line 72 def self.after_fork ::ActiveRecord::Base.establish_connection end |
.before_fork ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/delayed/backend/active_record.rb', line 64 def self.before_fork if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING) ::ActiveRecord::Base.connection_handler.clear_all_connections!(:all) else ::ActiveRecord::Base.connection_handler.clear_all_connections! end end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
77 78 79 |
# File 'lib/delayed/backend/active_record.rb', line 77 def self.clear_locks!(worker_name) where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil) end |
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have synchronized clocks.
188 189 190 191 192 193 194 195 196 |
# File 'lib/delayed/backend/active_record.rb', line 188 def self.db_time_now if Time.zone Time.zone.now elsif default_timezone == :utc Time.now.utc else Time.now # rubocop:disable Rails/TimeZone end end |
.default_timezone ⇒ Object
198 199 200 201 202 203 204 |
# File 'lib/delayed/backend/active_record.rb', line 198 def self.default_timezone if ::ActiveRecord.respond_to?(:default_timezone) ::ActiveRecord.default_timezone else ::ActiveRecord::Base.default_timezone end end |
.ready_to_run(worker_name, max_run_time) ⇒ Object
55 56 57 58 59 60 61 62 |
# File 'lib/delayed/backend/active_record.rb', line 55 def self.ready_to_run(worker_name, max_run_time) where( "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL", db_time_now, db_time_now - max_run_time, worker_name ) end |
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/delayed/backend/active_record.rb', line 81 def self.reserve(worker, max_run_time = Worker.max_run_time) ready_scope = ready_to_run(worker.name, max_run_time) .min_priority .max_priority .for_queues .by_priority reserve_with_scope(ready_scope, worker, db_time_now) end |
.reserve_with_scope(ready_scope, worker, now) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/delayed/backend/active_record.rb', line 92 def self.reserve_with_scope(ready_scope, worker, now) case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy # Optimizations for faster lookups on some common databases when :optimized_sql reserve_with_scope_using_optimized_sql(ready_scope, worker, now) # Slower but in some cases more unproblematic strategy to lookup records # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details. when :default_sql reserve_with_scope_using_default_sql(ready_scope, worker, now) end end |
.reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/delayed/backend/active_record.rb', line 118 def self.reserve_with_scope_using_default_sql(ready_scope, worker, now) # This is our old fashion, tried and true, but possibly slower lookup # Instead of reading the entire job record for our detect loop, we select only the id, # and only read the full job record after we've successfully locked the job. # This can have a noticeable impact on large read_ahead configurations and large payload jobs. ready_scope.limit(worker.read_ahead).select(:id).detect do |job| count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name) count == 1 && job.reload end end |
.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/delayed/backend/active_record.rb', line 170 def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) # The MSSQL driver doesn't generate a limit clause when update_all # is called directly subsubquery_sql = ready_scope.limit(1).to_sql # select("id") doesn't generate a subquery, so force a subquery subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x" quoted_table_name = connection.quote_table_name(table_name) sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})" count = connection.execute(sanitize_sql([sql, now, worker.name])) return nil if count == 0 # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row where(locked_at: now, locked_by: worker.name, failed_at: nil).first end |
.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/delayed/backend/active_record.rb', line 155 def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) # Removing the millisecond precision from now(time object) # MySQL 5.6.4 onwards millisecond precision exists, but the # datetime object created doesn't have precision, so discarded # while updating. But during the where clause, for mysql(>=5.6.4), # it queries with precision as well. So removing the precision now = now.change(usec: 0) # This works on MySQL and possibly some other DBs that support # UPDATE...LIMIT. It uses separate queries to lock and return the job count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) return nil if count == 0 where(locked_at: now, locked_by: worker.name, failed_at: nil).first end |
.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/delayed/backend/active_record.rb', line 129 def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT # This locks the single record 'FOR UPDATE' in the subquery # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE # Note: active_record would attempt to generate UPDATE...LIMIT like # SQL for Postgres if we use a .limit() filter, but it would not # use 'FOR UPDATE' and we would have many locking conflicts subquery = ready_scope.limit(1).lock(true).select("id").to_sql # On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other # when attempting to get the next available job # https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals subquery += " SKIP LOCKED" end quoted_name = connection.quote_table_name(table_name) find_by_sql( [ "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *", now, worker.name ] ).first end |
.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/delayed/backend/active_record.rb', line 104 def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) case connection.adapter_name when "PostgreSQL", "PostGIS" reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) when "MySQL", "Mysql2", "Trilogy" reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) when "MSSQL", "Teradata" reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) # Fallback for unknown / other DBMS else reserve_with_scope_using_default_sql(ready_scope, worker, now) end end |
.set_delayed_job_table_name ⇒ Object
48 49 50 51 |
# File 'lib/delayed/backend/active_record.rb', line 48 def self.set_delayed_job_table_name delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs" self.table_name = delayed_job_table_name end |
Instance Method Details
#reload(*args) ⇒ Object
206 207 208 209 |
# File 'lib/delayed/backend/active_record.rb', line 206 def reload(*args) reset super end |