Class: Sidekiq::Config
- Inherits:
-
Object
- Object
- Sidekiq::Config
- Extended by:
- Forwardable
- Defined in:
- lib/sidekiq/config.rb
Overview
Sidekiq::Config represents the global configuration for an instance of Sidekiq.
Constant Summary collapse
- DEFAULTS =
{ labels: Set.new, require: ".", environment: nil, concurrency: 5, timeout: 25, poll_interval_average: nil, average_scheduled_poll_interval: 5, on_complex_arguments: :raise, iteration: { max_job_runtime: nil, retry_backoff: 0 }, error_handlers: [], death_handlers: [], lifecycle_events: { startup: [], quiet: [], shutdown: [], # triggers when we fire the first heartbeat on startup OR repairing a network partition heartbeat: [], # triggers on EVERY heartbeat call, every 10 seconds beat: [] }, dead_max_jobs: 10_000, dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months reloader: proc { |&block| block.call }, backtrace_cleaner: ->(backtrace) { backtrace } }
- ERROR_HANDLER =
->(ex, ctx, cfg = Sidekiq.default_configuration) { l = cfg.logger l.warn(Sidekiq.dump_json(ctx)) unless ctx.empty? l.warn("#{ex.class.name}: #{ex.}") unless ex.backtrace.nil? backtrace = cfg[:backtrace_cleaner].call(ex.backtrace) l.warn(backtrace.join("\n")) end }
Instance Attribute Summary collapse
-
#capsules ⇒ Object
readonly
Returns the value of attribute capsules.
Instance Method Summary collapse
-
#average_scheduled_poll_interval=(interval) ⇒ Object
How frequently Redis should be checked by a random Sidekiq process for scheduled and retriable jobs.
-
#capsule(name) {|cap| ... } ⇒ Object
register a new queue processing subsystem.
- #client_middleware {|@client_chain| ... } ⇒ Object
- #concurrency ⇒ Object
-
#concurrency=(val) ⇒ Object
LEGACY: edits the default capsule config.concurrency = 5.
-
#death_handlers ⇒ Object
Death handlers are called when all retries for a job have been exhausted and the job dies.
- #default_capsule(&block) ⇒ Object
-
#error_handlers ⇒ Object
Register a proc to handle any error which occurs within the Sidekiq process.
- #freeze! ⇒ Object
-
#handle_exception(ex, ctx = {}) ⇒ Object
INTERNAL USE ONLY.
-
#initialize(options = {}) ⇒ Config
constructor
A new instance of Config.
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
-
#lookup(name, default_class = nil) ⇒ Object
find a singleton.
- #new_redis_pool(size, name = "unset") ⇒ Object
-
#on(event, &block) ⇒ Object
Register a block to run at a point in the Sidekiq lifecycle.
- #queues ⇒ Object
-
#queues=(val) ⇒ Object
Edit the default capsule.
- #redis ⇒ Object
-
#redis=(hash) ⇒ Object
All capsules must use the same Redis configuration.
- #redis_info ⇒ Object
- #redis_pool ⇒ Object
-
#register(name, instance) ⇒ Object
register global singletons which can be accessed elsewhere.
- #server_middleware {|@server_chain| ... } ⇒ Object
- #to_json ⇒ Object
- #total_concurrency ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Config
Returns a new instance of Config.
53 54 55 56 57 58 59 |
# File 'lib/sidekiq/config.rb', line 53 def initialize( = {}) @options = DEFAULTS.merge() @options[:error_handlers] << ERROR_HANDLER if @options[:error_handlers].empty? @directory = {} @redis_config = {} @capsules = {} end |
Instance Attribute Details
#capsules ⇒ Object (readonly)
Returns the value of attribute capsules.
62 63 64 |
# File 'lib/sidekiq/config.rb', line 62 def capsules @capsules end |
Instance Method Details
#average_scheduled_poll_interval=(interval) ⇒ Object
How frequently Redis should be checked by a random Sidekiq process for scheduled and retriable jobs. Each individual process will take turns by waiting some multiple of this value.
See sidekiq/scheduled.rb for an in-depth explanation of this value
230 231 232 |
# File 'lib/sidekiq/config.rb', line 230 def average_scheduled_poll_interval=(interval) @options[:average_scheduled_poll_interval] = interval end |
#capsule(name) {|cap| ... } ⇒ Object
register a new queue processing subsystem
117 118 119 120 121 122 123 124 125 |
# File 'lib/sidekiq/config.rb', line 117 def capsule(name) nm = name.to_s cap = @capsules.fetch(nm) do cap = Sidekiq::Capsule.new(nm, self) @capsules[nm] = cap end yield cap if block_given? cap end |
#client_middleware {|@client_chain| ... } ⇒ Object
100 101 102 103 104 |
# File 'lib/sidekiq/config.rb', line 100 def client_middleware @client_chain ||= Sidekiq::Middleware::Chain.new(self) yield @client_chain if block_given? @client_chain end |
#concurrency ⇒ Object
74 75 76 |
# File 'lib/sidekiq/config.rb', line 74 def concurrency default_capsule.concurrency end |
#concurrency=(val) ⇒ Object
LEGACY: edits the default capsule config.concurrency = 5
70 71 72 |
# File 'lib/sidekiq/config.rb', line 70 def concurrency=(val) default_capsule.concurrency = Integer(val) end |
#death_handlers ⇒ Object
Death handlers are called when all retries for a job have been exhausted and the job dies. It’s the notification to your application that this job will not succeed without manual intervention.
Sidekiq.configure_server do |config|
config.death_handlers << ->(job, ex) do
end
end
221 222 223 |
# File 'lib/sidekiq/config.rb', line 221 def death_handlers @options[:death_handlers] end |
#default_capsule(&block) ⇒ Object
112 113 114 |
# File 'lib/sidekiq/config.rb', line 112 def default_capsule(&block) capsule("default", &block) end |
#error_handlers ⇒ Object
Register a proc to handle any error which occurs within the Sidekiq process.
Sidekiq.configure_server do |config|
config.error_handlers << proc {|ex,ctx_hash| MyErrorService.notify(ex, ctx_hash) }
end
The default error handler logs errors to @logger.
241 242 243 |
# File 'lib/sidekiq/config.rb', line 241 def error_handlers @options[:error_handlers] end |
#freeze! ⇒ Object
206 207 208 209 210 |
# File 'lib/sidekiq/config.rb', line 206 def freeze! @directory.freeze @options.freeze true end |
#handle_exception(ex, ctx = {}) ⇒ Object
INTERNAL USE ONLY
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/sidekiq/config.rb', line 285 def handle_exception(ex, ctx = {}) if @options[:error_handlers].size == 0 p ["!!!!!", ex] end @options[:error_handlers].each do |handler| if parameter_size(handler) == 2 # TODO Remove in 8.0 logger.info { "DEPRECATION: Sidekiq exception handlers now take three arguments, see #{handler}" } handler.call(ex, {_config: self}.merge(ctx)) else handler.call(ex, ctx, self) end rescue Exception => e l = logger l.error "!!! ERROR HANDLER THREW AN ERROR !!!" l.error e l.error e.backtrace.join("\n") unless e.backtrace.nil? end end |
#logger ⇒ Object
259 260 261 262 263 264 265 266 267 268 |
# File 'lib/sidekiq/config.rb', line 259 def logger @logger ||= Sidekiq::Logger.new($stdout, level: :info).tap do |log| log.level = Logger::INFO log.formatter = if ENV["DYNO"] Sidekiq::Logger::Formatters::WithoutTimestamp.new else Sidekiq::Logger::Formatters::Pretty.new end end end |
#logger=(logger) ⇒ Object
270 271 272 273 274 275 276 277 |
# File 'lib/sidekiq/config.rb', line 270 def logger=(logger) if logger.nil? self.logger.level = Logger::FATAL return end @logger = logger end |
#lookup(name, default_class = nil) ⇒ Object
find a singleton
198 199 200 201 202 203 204 |
# File 'lib/sidekiq/config.rb', line 198 def lookup(name, default_class = nil) # JNDI is just a fancy name for a hash lookup @directory.fetch(name) do |key| return nil unless default_class register(key, default_class.new(self)) end end |
#new_redis_pool(size, name = "unset") ⇒ Object
142 143 144 145 146 |
# File 'lib/sidekiq/config.rb', line 142 def new_redis_pool(size, name = "unset") # connection pool is lazy, it will not create connections unless you actually need them # so don't be skimpy! RedisConnection.create({size: size, logger: logger, pool_name: name}.merge(@redis_config)) end |
#on(event, &block) ⇒ Object
Register a block to run at a point in the Sidekiq lifecycle. :startup, :quiet or :shutdown are valid events.
Sidekiq.configure_server do |config|
config.on(:shutdown) do
puts "Goodbye cruel world!"
end
end
253 254 255 256 257 |
# File 'lib/sidekiq/config.rb', line 253 def on(event, &block) raise ArgumentError, "Symbols only please: #{event}" unless event.is_a?(Symbol) raise ArgumentError, "Invalid event name: #{event}" unless @options[:lifecycle_events].key?(event) @options[:lifecycle_events][event] << block end |
#queues ⇒ Object
96 97 98 |
# File 'lib/sidekiq/config.rb', line 96 def queues default_capsule.queues end |
#queues=(val) ⇒ Object
Edit the default capsule. config.queues = %w( high default low ) # strict config.queues = %w( high,3 default,2 low,1 ) # weighted config.queues = %w( feature1,1 feature2,1 feature3,1 ) # random
With weighted priority, queue will be checked first (weight / total) of the time. high will be checked first (3/6) or 50% of the time. I’d recommend setting weights between 1-10. Weights in the hundreds or thousands are ridiculous and unnecessarily expensive. You can get random queue ordering by explicitly setting all weights to 1.
92 93 94 |
# File 'lib/sidekiq/config.rb', line 92 def queues=(val) default_capsule.queues = val end |
#redis ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/sidekiq/config.rb', line 164 def redis raise ArgumentError, "requires a block" unless block_given? redis_pool.with do |conn| retryable = true begin yield conn rescue RedisClientAdapter::BaseError => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The same retry logic is also used in client.rb if retryable && ex. =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.close retryable = false retry end raise end end end |
#redis=(hash) ⇒ Object
All capsules must use the same Redis configuration
128 129 130 |
# File 'lib/sidekiq/config.rb', line 128 def redis=(hash) @redis_config = @redis_config.merge(hash) end |
#redis_info ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/sidekiq/config.rb', line 148 def redis_info redis do |conn| conn.call("INFO") { |i| i.lines(chomp: true).map { |l| l.split(":", 2) }.select { |l| l.size == 2 }.to_h } rescue RedisClientAdapter::CommandError => ex # 2850 return fake version when INFO command has (probably) been renamed raise unless /unknown command/.match?(ex.) { "redis_version" => "9.9.9", "uptime_in_days" => "9999", "connected_clients" => "9999", "used_memory_human" => "9P", "used_memory_peak_human" => "9P" }.freeze end end |
#redis_pool ⇒ Object
132 133 134 |
# File 'lib/sidekiq/config.rb', line 132 def redis_pool Thread.current[:sidekiq_redis_pool] || Thread.current[:sidekiq_capsule]&.redis_pool || local_redis_pool end |
#register(name, instance) ⇒ Object
register global singletons which can be accessed elsewhere
187 188 189 190 191 192 193 194 195 |
# File 'lib/sidekiq/config.rb', line 187 def register(name, instance) # logger.debug("register[#{name}] = #{instance}") # Sidekiq Enterprise lazy registers a few services so we # can't lock down this hash completely. hash = @directory.dup hash[name] = instance @directory = hash.freeze instance end |
#server_middleware {|@server_chain| ... } ⇒ Object
106 107 108 109 110 |
# File 'lib/sidekiq/config.rb', line 106 def server_middleware @server_chain ||= Sidekiq::Middleware::Chain.new(self) yield @server_chain if block_given? @server_chain end |
#to_json ⇒ Object
64 65 66 |
# File 'lib/sidekiq/config.rb', line 64 def to_json(*) Sidekiq.dump_json(@options) end |
#total_concurrency ⇒ Object
78 79 80 |
# File 'lib/sidekiq/config.rb', line 78 def total_concurrency capsules.each_value.sum(&:concurrency) end |