Class: Rjob::Context
- Inherits:
-
Object
- Object
- Rjob::Context
- Defined in:
- lib/rjob/context.rb
Instance Attribute Summary collapse
-
#bucket_count ⇒ Object
readonly
Returns the value of attribute bucket_count.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#job_wrapper_proc ⇒ Object
readonly
Returns the value of attribute job_wrapper_proc.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#recurring_jobs ⇒ Object
readonly
Returns the value of attribute recurring_jobs.
-
#script_runner ⇒ Object
readonly
Returns the value of attribute script_runner.
Class Method Summary collapse
-
.configure {|config| ... } ⇒ Object
Available options:.
- .instance ⇒ Object
- .set_instance(instance) ⇒ Object
Instance Method Summary collapse
- #create_redis_connection ⇒ Object
- #dead_job_count ⇒ Object
- #demodularize_class(name) ⇒ Object
- #enqueue_job(job_class, args) ⇒ Object
- #enqueue_job_with_redis(job_class, args, r) ⇒ Object
- #fetch_worker_class(class_name:) ⇒ Object
- #get_dead_jobs(count = 1, offset = 0, keyname: 'ohh-dead') ⇒ Object
-
#initialize(config) ⇒ Context
constructor
A new instance of Context.
- #redis(&block) ⇒ Object
- #schedule_job_at(timestamp, job_class, args) ⇒ Object
Constructor Details
#initialize(config) ⇒ Context
Returns a new instance of Context.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rjob/context.rb', line 38 def initialize(config) @config = config.dup @pool_size = @config.fetch(:redis_pool_size, 10) @bucket_count = config.fetch(:bucket_count, 32) @prefix = config.fetch(:prefix, 'rjob') @logger = config[:logger] @job_wrapper_proc = config[:job_wrapper_proc] @script_runner = Rjob::Scripts::ScriptRunner.new @recurring_jobs = nil if config.key?(:recurring_jobs) require "rjob/recurring" @recurring_jobs = config[:recurring_jobs].map do |defn| Rjob::RecurringJob.from_definition(self, defn) end end initialize_connection_pool load_redis_scripts end |
Instance Attribute Details
#bucket_count ⇒ Object (readonly)
Returns the value of attribute bucket_count.
6 7 8 |
# File 'lib/rjob/context.rb', line 6 def bucket_count @bucket_count end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
4 5 6 |
# File 'lib/rjob/context.rb', line 4 def config @config end |
#job_wrapper_proc ⇒ Object (readonly)
Returns the value of attribute job_wrapper_proc.
8 9 10 |
# File 'lib/rjob/context.rb', line 8 def job_wrapper_proc @job_wrapper_proc end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
7 8 9 |
# File 'lib/rjob/context.rb', line 7 def logger @logger end |
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
5 6 7 |
# File 'lib/rjob/context.rb', line 5 def prefix @prefix end |
#recurring_jobs ⇒ Object (readonly)
Returns the value of attribute recurring_jobs.
10 11 12 |
# File 'lib/rjob/context.rb', line 10 def recurring_jobs @recurring_jobs end |
#script_runner ⇒ Object (readonly)
Returns the value of attribute script_runner.
9 10 11 |
# File 'lib/rjob/context.rb', line 9 def script_runner @script_runner end |
Class Method Details
.configure {|config| ... } ⇒ Object
Available options:
:redis - (passed to Redis.new) :max_threads - paralallelism :bucket_count - defaults to 32 :redis_pool_size - redis connection pool size. Defaults to 10 :prefix - defaults to “rjob” :job_wrapper_proc - defaults to none :logger - duck-typed Logger, defaults to nil
31 32 33 34 35 36 |
# File 'lib/rjob/context.rb', line 31 def self.configure raise "Already configured!: #{@instance}" if @instance config = {} yield(config) set_instance(new(config)) end |
.instance ⇒ Object
12 13 14 15 |
# File 'lib/rjob/context.rb', line 12 def self.instance return @instance if @instance raise "Rjob is not configured. Please call Rjob.configure first" end |
.set_instance(instance) ⇒ Object
17 18 19 |
# File 'lib/rjob/context.rb', line 17 def self.set_instance(instance) @instance = instance end |
Instance Method Details
#create_redis_connection ⇒ Object
115 116 117 118 |
# File 'lib/rjob/context.rb', line 115 def create_redis_connection redis_args = @config[:redis] Redis.new(redis_args) end |
#dead_job_count ⇒ Object
61 62 63 |
# File 'lib/rjob/context.rb', line 61 def dead_job_count() redis { |r| r.llen("#{@prefix}:dead") } end |
#demodularize_class(name) ⇒ Object
107 108 109 110 111 112 113 |
# File 'lib/rjob/context.rb', line 107 def demodularize_class(name) const = Kernel name.split('::').each do |n| const = const.const_get(n) end const end |
#enqueue_job(job_class, args) ⇒ Object
86 87 88 |
# File 'lib/rjob/context.rb', line 86 def enqueue_job(job_class, args) redis(&method(:enqueue_job_with_redis).curry[job_class, args]) end |
#enqueue_job_with_redis(job_class, args, r) ⇒ Object
90 91 92 93 |
# File 'lib/rjob/context.rb', line 90 def enqueue_job_with_redis(job_class, args, r) job_data = MessagePack.pack([job_class.to_s, args]) @script_runner.exec(r, :enqueue_job, [], [@prefix, @bucket_count, job_data]) end |
#fetch_worker_class(class_name:) ⇒ Object
103 104 105 |
# File 'lib/rjob/context.rb', line 103 def fetch_worker_class(class_name:) demodularize_class(class_name) end |
#get_dead_jobs(count = 1, offset = 0, keyname: 'ohh-dead') ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/rjob/context.rb', line 65 def get_dead_jobs(count=1, offset=0, keyname: 'ohh-dead') redis do |r| # dead_jobs = r.lrange("#{@prefix}:dead", offset, count < 0 ? -1 : (offset + count - 1)) dead_jobs = r.lrange(keyname, offset, count < 0 ? -1 : (offset + count - 1)) dead_jobs.map do |error_payload| payload = MessagePack.unpack(error_payload) { job: Rjob::Job.deserialize(self, payload['job']), when: Time.at(payload['when']), error_class: payload['error_class'], full_message: payload['message'], } end end end |
#redis(&block) ⇒ Object
82 83 84 |
# File 'lib/rjob/context.rb', line 82 def redis(&block) @pool.with(&block) end |
#schedule_job_at(timestamp, job_class, args) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/rjob/context.rb', line 95 def schedule_job_at(, job_class, args) job_data = MessagePack.pack([job_class.to_s, args]) redis do |r| @script_runner.exec(r, :schedule_job_at, [], [.to_s, job_data, @prefix, @bucket_count]) end end |