Module: Faktory

Defined in:
lib/faktory/io.rb,
lib/faktory.rb,
lib/faktory/cli.rb,
lib/faktory/cli.rb,
lib/faktory/job.rb,
lib/faktory/util.rb,
lib/faktory/batch.rb,
lib/faktory/fetch.rb,
lib/faktory/rails.rb,
lib/faktory/client.rb,
lib/faktory/mutate.rb,
lib/faktory/logging.rb,
lib/faktory/manager.rb,
lib/faktory/testing.rb,
lib/faktory/version.rb,
lib/faktory/launcher.rb,
lib/faktory/tracking.rb,
lib/faktory/processor.rb,
lib/faktory/connection.rb,
lib/faktory/job_logger.rb,
lib/faktory/middleware/chain.rb,
lib/faktory/exception_handler.rb

Overview

Faktory’s MUTATE API allows you to scan the sorted sets within Redis (retries, scheduled, dead) and take action (delete, enqueue, kill) on entries.

require ‘faktory/mutate’ cl = Faktory::Client.new cl.discard(Faktory::RETRIES) do |filter|

filter.with_type("QuickBooksSyncJob")
filter.matching("*uid:12345*"))

end

Defined Under Namespace

Modules: ExceptionHandler, Job, Logging, Middleware, Mutator, Queues, ReadTimeout, Testing, Trackable, Util Classes: BaseError, Batch, BatchStatus, CLI, Client, CommandError, Connection, EmptyQueueError, Fetcher, JobLogger, Launcher, Manager, ParseError, Processor, Rails, Shutdown, TimeoutError, UnitOfWork

Constant Summary collapse

NAME =
"Faktory"
LICENSE =
"See LICENSE and the LGPL-3.0 for licensing details."
DEFAULTS =
{
  queues: ["default"],
  concurrency: 10,
  require: ".",
  environment: "development",
  # As of 2017, Heroku's process timeout is 30 seconds.
  # After 30 seconds, processes are KILLed so assume 25
  # seconds to gracefully shutdown and 5 seconds to hard
  # shutdown.
  timeout: 25,
  error_handlers: [],
  lifecycle_events: {
    startup: [],
    quiet: [],
    shutdown: []
  },
  reloader: proc { |&block| block.call }
}
DEFAULT_JOB_OPTIONS =
{
  "retry" => 25,
  "queue" => "default"
}
RETRIES =

Valid targets

"retries"
SCHEDULED =
"scheduled"
DEAD =
"dead"
VERSION =
"2.1.2"

Class Method Summary collapse

Class Method Details

.client_middleware {|@client_chain| ... } ⇒ Object

Yields:

  • (@client_chain)


91
92
93
94
95
# File 'lib/faktory.rb', line 91

def self.client_middleware
  @client_chain ||= Middleware::Chain.new
  yield @client_chain if block_given?
  @client_chain
end

.configure_client {|_self| ... } ⇒ Object

Configuration for Faktory client, use like:

Faktory.configure_client do |config|
  config.default_job_options = { retry: 3 }
end

Yields:

  • (_self)

Yield Parameters:

  • _self (Faktory)

    the object that the method was called on



68
69
70
# File 'lib/faktory.rb', line 68

def self.configure_client
  yield self unless worker?
end

.configure_worker {|_self| ... } ⇒ Object

Configuration for Faktory executor, use like:

Faktory.configure_worker do |config|
  config.worker_middleware do |chain|
    chain.add MyServerHook
  end
  config.default_job_options = { retry: 3 }
end

Yields:

  • (_self)

Yield Parameters:

  • _self (Faktory)

    the object that the method was called on



58
59
60
# File 'lib/faktory.rb', line 58

def self.configure_worker
  yield self if worker?
end

.default_job_optionsObject



108
109
110
# File 'lib/faktory.rb', line 108

def self.default_job_options
  defined?(@default_job_options) ? @default_job_options : DEFAULT_JOB_OPTIONS
end

.default_job_options=(hash) ⇒ Object



103
104
105
106
# File 'lib/faktory.rb', line 103

def self.default_job_options=(hash)
  # stringify
  @default_job_options = default_job_options.merge(hash.map { |k, v| [k.to_s, v] }.to_h)
end

.dump_json(object) ⇒ Object



116
117
118
# File 'lib/faktory.rb', line 116

def self.dump_json(object)
  JSON.generate(object)
end

.error_handlersObject

Register a proc to handle any error which occurs within the Faktory process.

Faktory.configure_worker do |config|
  config.error_handlers << proc {|ex,ctx_hash| MyErrorService.notify(ex, ctx_hash) }
end

The default error handler logs errors to Faktory.logger.



144
145
146
# File 'lib/faktory.rb', line 144

def self.error_handlers
  options[:error_handlers]
end

.faktory=(hash) ⇒ Object



87
88
89
# File 'lib/faktory.rb', line 87

def self.faktory=(hash)
  @pool = Faktory::Connection.create(hash)
end

.load_json(string) ⇒ Object



112
113
114
# File 'lib/faktory.rb', line 112

def self.load_json(string)
  JSON.parse(string)
end

.loggerObject



120
121
122
# File 'lib/faktory.rb', line 120

def self.logger
  Faktory::Logging.logger
end

.logger=(log) ⇒ Object



124
125
126
# File 'lib/faktory.rb', line 124

def self.logger=(log)
  Faktory::Logging.logger = log
end

.on(event, &block) ⇒ Object

Register a block to run at a point in the Faktory lifecycle. :startup, :quiet or :shutdown are valid events.

Faktory.configure_worker do |config|
  config.on(:shutdown) do
    puts "Goodbye cruel world!"
  end
end

Raises:



156
157
158
159
160
# File 'lib/faktory.rb', line 156

def self.on(event, &block)
  raise ArgumentError, "Symbols only please: #{event}" unless event.is_a?(Symbol)
  raise ArgumentError, "Invalid event name: #{event}" unless options[:lifecycle_events].key?(event)
  options[:lifecycle_events][event] << block
end

.optionsObject



41
42
43
# File 'lib/faktory.rb', line 41

def self.options
  @options ||= DEFAULTS.dup
end

.options=(opts) ⇒ Object



45
46
47
# File 'lib/faktory.rb', line 45

def self.options=(opts)
  @options = opts
end

.serverObject

Raises:



76
77
78
79
80
81
# File 'lib/faktory.rb', line 76

def self.server
  raise ArgumentError, "requires a block" unless block_given?
  server_pool.with do |conn|
    yield conn
  end
end

.server_poolObject



83
84
85
# File 'lib/faktory.rb', line 83

def self.server_pool
  @pool ||= Faktory::Connection.create
end

.worker?Boolean



72
73
74
# File 'lib/faktory.rb', line 72

def self.worker?
  defined?(Faktory::CLI)
end

.worker_middleware {|@worker_chain| ... } ⇒ Object

Yields:

  • (@worker_chain)


97
98
99
100
101
# File 'lib/faktory.rb', line 97

def self.worker_middleware
  @worker_chain ||= Middleware::Chain.new
  yield @worker_chain if block_given?
  @worker_chain
end

.💃🕺(io = $stdout) ⇒ Object



128
129
130
131
132
133
134
135
# File 'lib/faktory.rb', line 128

def self.