Class: Sidekiq::Config
- Inherits:
-
Object
- Object
- Sidekiq::Config
- Extended by:
- Forwardable
- Defined in:
- lib/sidekiq/config.rb
Overview
Sidekiq::Config represents the global configuration for an instance of Sidekiq.
Constant Summary collapse
- DEFAULTS =
{ labels: Set.new, require: ".", environment: nil, concurrency: 5, timeout: 25, poll_interval_average: nil, average_scheduled_poll_interval: 5, on_complex_arguments: :raise, # if the Iterable job runs longer than this value (in seconds), then the job # will be interrupted after the current iteration and re-enqueued at the back of the queue max_iteration_runtime: nil, error_handlers: [], death_handlers: [], lifecycle_events: { startup: [], quiet: [], shutdown: [], exit: [], # triggers when we fire the first heartbeat on startup OR repairing a network partition heartbeat: [], # triggers on EVERY heartbeat call, every 10 seconds beat: [] }, dead_max_jobs: 10_000, dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months reloader: proc { |&block| block.call }, backtrace_cleaner: ->(backtrace) { backtrace } }
- ERROR_HANDLER =
->(ex, ctx, cfg = Sidekiq.default_configuration) { Sidekiq::Context.with(ctx) do dev = cfg[:environment] == "development" fancy = dev && $stdout.tty? # π© # Weird logic here but we want to show the backtrace in local # development or if verbose logging is enabled. # # `full_message` contains the error class, message and backtrace # `detailed_message` contains the error class and message # # Absolutely terrible API names. Not useful at all to have two # methods with similar but obscure names. if dev || cfg.logger.debug? cfg.logger.info { ex.(highlight: fancy) } else cfg.logger.info { ex.(highlight: fancy) } end end }
Instance Attribute Summary collapse
-
#capsules ⇒ Object
readonly
Returns the value of attribute capsules.
-
#thread_priority ⇒ Object
Returns the value of attribute thread_priority.
Instance Method Summary collapse
-
#average_scheduled_poll_interval=(interval) ⇒ Object
How frequently Redis should be checked by a random Sidekiq process for scheduled and retriable jobs.
-
#capsule(name) {|cap| ... } ⇒ Object
register a new queue processing subsystem.
- #client_middleware {|@client_chain| ... } ⇒ Object
- #concurrency ⇒ Object
-
#concurrency=(val) ⇒ Object
LEGACY: edits the default capsule config.concurrency = 5.
-
#death_handlers ⇒ Object
Death handlers are called when all retries for a job have been exhausted and the job dies.
- #default_capsule(&block) ⇒ Object
-
#error_handlers ⇒ Object
Register a proc to handle any error which occurs within the Sidekiq process.
- #freeze! ⇒ Object
-
#handle_exception(ex, ctx = {}) ⇒ Object
INTERNAL USE ONLY.
-
#initialize(options = {}) ⇒ Config
constructor
A new instance of Config.
- #inspect ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
-
#lookup(name, default_class = nil) ⇒ Object
find a singleton.
- #new_redis_pool(size, name = "unset") ⇒ Object
-
#on(event, &block) ⇒ Object
Register a block to run at a point in the Sidekiq lifecycle.
- #queues ⇒ Object
-
#queues=(val) ⇒ Object
Edit the default capsule.
- #redis ⇒ Object
-
#redis=(hash) ⇒ Object
All capsules must use the same Redis configuration.
- #redis_info ⇒ Object
- #redis_pool ⇒ Object
-
#register(name, instance) ⇒ Object
register global singletons which can be accessed elsewhere.
- #server_middleware {|@server_chain| ... } ⇒ Object
- #to_json ⇒ Object
- #total_concurrency ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Config
Returns a new instance of Config.
61 62 63 64 65 66 67 |
# File 'lib/sidekiq/config.rb', line 61 def initialize( = {}) @options = DEFAULTS.merge() @options[:error_handlers] << ERROR_HANDLER if @options[:error_handlers].empty? @directory = {} @redis_config = {} @capsules = {} end |
Instance Attribute Details
#capsules ⇒ Object (readonly)
Returns the value of attribute capsules.
70 71 72 |
# File 'lib/sidekiq/config.rb', line 70 def capsules @capsules end |
#thread_priority ⇒ Object
Returns the value of attribute thread_priority.
71 72 73 |
# File 'lib/sidekiq/config.rb', line 71 def thread_priority @thread_priority end |
Instance Method Details
#average_scheduled_poll_interval=(interval) ⇒ Object
How frequently Redis should be checked by a random Sidekiq process for scheduled and retriable jobs. Each individual process will take turns by waiting some multiple of this value.
See sidekiq/scheduled.rb for an in-depth explanation of this value
245 246 247 |
# File 'lib/sidekiq/config.rb', line 245 def average_scheduled_poll_interval=(interval) @options[:average_scheduled_poll_interval] = interval end |
#capsule(name) {|cap| ... } ⇒ Object
register a new queue processing subsystem
132 133 134 135 136 137 138 139 140 |
# File 'lib/sidekiq/config.rb', line 132 def capsule(name) nm = name.to_s cap = @capsules.fetch(nm) do cap = Sidekiq::Capsule.new(nm, self) @capsules[nm] = cap end yield cap if block_given? cap end |
#client_middleware {|@client_chain| ... } ⇒ Object
115 116 117 118 119 |
# File 'lib/sidekiq/config.rb', line 115 def client_middleware @client_chain ||= Sidekiq::Middleware::Chain.new(self) yield @client_chain if block_given? @client_chain end |
#concurrency ⇒ Object
89 90 91 |
# File 'lib/sidekiq/config.rb', line 89 def concurrency default_capsule.concurrency end |
#concurrency=(val) ⇒ Object
LEGACY: edits the default capsule config.concurrency = 5
85 86 87 |
# File 'lib/sidekiq/config.rb', line 85 def concurrency=(val) default_capsule.concurrency = Integer(val) end |
#death_handlers ⇒ Object
Death handlers are called when all retries for a job have been exhausted and the job dies. Itβs the notification to your application that this job will not succeed without manual intervention.
Sidekiq.configure_server do |config|
config.death_handlers << ->(job, ex) do
end
end
236 237 238 |
# File 'lib/sidekiq/config.rb', line 236 def death_handlers @options[:death_handlers] end |
#default_capsule(&block) ⇒ Object
127 128 129 |
# File 'lib/sidekiq/config.rb', line 127 def default_capsule(&block) capsule("default", &block) end |
#error_handlers ⇒ Object
Register a proc to handle any error which occurs within the Sidekiq process.
Sidekiq.configure_server do |config|
config.error_handlers << proc {|ex,ctx_hash| MyErrorService.notify(ex, ctx_hash) }
end
The default error handler logs errors to @logger.
256 257 258 |
# File 'lib/sidekiq/config.rb', line 256 def error_handlers @options[:error_handlers] end |
#freeze! ⇒ Object
221 222 223 224 225 |
# File 'lib/sidekiq/config.rb', line 221 def freeze! @directory.freeze @options.freeze true end |
#handle_exception(ex, ctx = {}) ⇒ Object
INTERNAL USE ONLY
300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/sidekiq/config.rb', line 300 def handle_exception(ex, ctx = {}) if @options[:error_handlers].size == 0 p ["!!!!!", ex] end @options[:error_handlers].each do |handler| handler.call(ex, ctx, self) rescue Exception => e l = logger l.error "!!! ERROR HANDLER THREW AN ERROR !!!" l.error e l.error e.backtrace.join("\n") unless e.backtrace.nil? end end |
#inspect ⇒ Object
73 74 75 76 77 |
# File 'lib/sidekiq/config.rb', line 73 def inspect "#<#{self.class.name} @options=#{ @options.except(:lifecycle_events, :reloader, :death_handlers, :error_handlers).inspect }>" end |
#logger ⇒ Object
274 275 276 277 278 279 280 281 282 283 |
# File 'lib/sidekiq/config.rb', line 274 def logger @logger ||= Sidekiq::Logger.new($stdout, level: :info).tap do |log| log.level = Logger::INFO log.formatter = if ENV["DYNO"] Sidekiq::Logger::Formatters::WithoutTimestamp.new else Sidekiq::Logger::Formatters::Pretty.new end end end |
#logger=(logger) ⇒ Object
285 286 287 288 289 290 291 292 |
# File 'lib/sidekiq/config.rb', line 285 def logger=(logger) if logger.nil? self.logger.level = Logger::FATAL return end @logger = logger end |
#lookup(name, default_class = nil) ⇒ Object
find a singleton
213 214 215 216 217 218 219 |
# File 'lib/sidekiq/config.rb', line 213 def lookup(name, default_class = nil) # JNDI is just a fancy name for a hash lookup @directory.fetch(name) do |key| return nil unless default_class register(key, default_class.new(self)) end end |
#new_redis_pool(size, name = "unset") ⇒ Object
157 158 159 160 161 |
# File 'lib/sidekiq/config.rb', line 157 def new_redis_pool(size, name = "unset") # connection pool is lazy, it will not create connections unless you actually need them # so don't be skimpy! RedisConnection.create({size: size, logger: logger, pool_name: name}.merge(@redis_config)) end |
#on(event, &block) ⇒ Object
Register a block to run at a point in the Sidekiq lifecycle. :startup, :quiet, :shutdown, or :exit are valid events.
Sidekiq.configure_server do |config|
config.on(:shutdown) do
puts "Goodbye cruel world!"
end
end
268 269 270 271 272 |
# File 'lib/sidekiq/config.rb', line 268 def on(event, &block) raise ArgumentError, "Symbols only please: #{event}" unless event.is_a?(Symbol) raise ArgumentError, "Invalid event name: #{event}" unless @options[:lifecycle_events].key?(event) @options[:lifecycle_events][event] << block end |
#queues ⇒ Object
111 112 113 |
# File 'lib/sidekiq/config.rb', line 111 def queues default_capsule.queues end |
#queues=(val) ⇒ Object
Edit the default capsule. config.queues = %w( high default low ) # strict config.queues = %w( high,3 default,2 low,1 ) # weighted config.queues = %w( feature1,1 feature2,1 feature3,1 ) # random
With weighted priority, queue will be checked first (weight / total) of the time. high will be checked first (3/6) or 50% of the time. Iβd recommend setting weights between 1-10. Weights in the hundreds or thousands are ridiculous and unnecessarily expensive. You can get random queue ordering by explicitly setting all weights to 1.
107 108 109 |
# File 'lib/sidekiq/config.rb', line 107 def queues=(val) default_capsule.queues = val end |
#redis ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/sidekiq/config.rb', line 179 def redis raise ArgumentError, "requires a block" unless block_given? redis_pool.with do |conn| retryable = true begin yield conn rescue RedisClientAdapter::BaseError => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The same retry logic is also used in client.rb if retryable && ex. =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.close retryable = false retry end raise end end end |
#redis=(hash) ⇒ Object
All capsules must use the same Redis configuration
143 144 145 |
# File 'lib/sidekiq/config.rb', line 143 def redis=(hash) @redis_config = @redis_config.merge(hash) end |
#redis_info ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/sidekiq/config.rb', line 163 def redis_info redis do |conn| conn.call("INFO") { |i| i.lines(chomp: true).map { |l| l.split(":", 2) }.select { |l| l.size == 2 }.to_h } rescue RedisClientAdapter::CommandError => ex # 2850 return fake version when INFO command has (probably) been renamed raise unless /unknown command/.match?(ex.) { "redis_version" => "9.9.9", "uptime_in_days" => "9999", "connected_clients" => "9999", "used_memory_human" => "9P", "used_memory_peak_human" => "9P" }.freeze end end |
#redis_pool ⇒ Object
147 148 149 |
# File 'lib/sidekiq/config.rb', line 147 def redis_pool Thread.current[:sidekiq_redis_pool] || Thread.current[:sidekiq_capsule]&.redis_pool || local_redis_pool end |
#register(name, instance) ⇒ Object
register global singletons which can be accessed elsewhere
202 203 204 205 206 207 208 209 210 |
# File 'lib/sidekiq/config.rb', line 202 def register(name, instance) # logger.debug("register[#{name}] = #{instance}") # Sidekiq Enterprise lazy registers a few services so we # can't lock down this hash completely. hash = @directory.dup hash[name] = instance @directory = hash.freeze instance end |
#server_middleware {|@server_chain| ... } ⇒ Object
121 122 123 124 125 |
# File 'lib/sidekiq/config.rb', line 121 def server_middleware @server_chain ||= Sidekiq::Middleware::Chain.new(self) yield @server_chain if block_given? @server_chain end |
#to_json ⇒ Object
79 80 81 |
# File 'lib/sidekiq/config.rb', line 79 def to_json(*) Sidekiq.dump_json(@options) end |
#total_concurrency ⇒ Object
93 94 95 |
# File 'lib/sidekiq/config.rb', line 93 def total_concurrency capsules.each_value.sum(&:concurrency) end |