Class: Rjob::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/rjob/context.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Context

Returns a new instance of Context.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rjob/context.rb', line 38

def initialize(config)
  @config = config.dup
  @pool_size = @config.fetch(:redis_pool_size, 10)

  @bucket_count = config.fetch(:bucket_count, 32)
  @prefix = config.fetch(:prefix, 'rjob')
  @logger = config[:logger]
  @job_wrapper_proc = config[:job_wrapper_proc]
  @script_runner = Rjob::Scripts::ScriptRunner.new
  @recurring_jobs = nil

  if config.key?(:recurring_jobs)
    require "rjob/recurring"

    @recurring_jobs = config[:recurring_jobs].map do |defn|
      Rjob::RecurringJob.from_definition(self, defn)
    end
  end

  initialize_connection_pool
  load_redis_scripts
end

Instance Attribute Details

#bucket_countObject (readonly)

Returns the value of attribute bucket_count.



6
7
8
# File 'lib/rjob/context.rb', line 6

def bucket_count
  @bucket_count
end

#configObject (readonly)

Returns the value of attribute config.



4
5
6
# File 'lib/rjob/context.rb', line 4

def config
  @config
end

#job_wrapper_procObject (readonly)

Returns the value of attribute job_wrapper_proc.



8
9
10
# File 'lib/rjob/context.rb', line 8

def job_wrapper_proc
  @job_wrapper_proc
end

#loggerObject (readonly)

Returns the value of attribute logger.



7
8
9
# File 'lib/rjob/context.rb', line 7

def logger
  @logger
end

#prefixObject (readonly)

Returns the value of attribute prefix.



5
6
7
# File 'lib/rjob/context.rb', line 5

def prefix
  @prefix
end

#recurring_jobsObject (readonly)

Returns the value of attribute recurring_jobs.



10
11
12
# File 'lib/rjob/context.rb', line 10

def recurring_jobs
  @recurring_jobs
end

#script_runnerObject (readonly)

Returns the value of attribute script_runner.



9
10
11
# File 'lib/rjob/context.rb', line 9

def script_runner
  @script_runner
end

Class Method Details

.configure {|config| ... } ⇒ Object

Available options:

:redis - (passed to Redis.new) :max_threads - paralallelism :bucket_count - defaults to 32 :redis_pool_size - redis connection pool size. Defaults to 10 :prefix - defaults to “rjob” :job_wrapper_proc - defaults to none :logger - duck-typed Logger, defaults to nil

Yields:



31
32
33
34
35
36
# File 'lib/rjob/context.rb', line 31

def self.configure
  raise "Already configured!: #{@instance}" if @instance
  config = {}
  yield(config)
  set_instance(new(config))
end

.instanceObject



12
13
14
15
# File 'lib/rjob/context.rb', line 12

def self.instance
  return @instance if @instance
  raise "Rjob is not configured. Please call Rjob.configure first"
end

.set_instance(instance) ⇒ Object



17
18
19
# File 'lib/rjob/context.rb', line 17

def self.set_instance(instance)
  @instance = instance
end

Instance Method Details

#create_redis_connectionObject



115
116
117
118
# File 'lib/rjob/context.rb', line 115

def create_redis_connection
  redis_args = @config[:redis]
  Redis.new(redis_args)
end

#dead_job_countObject



61
62
63
# File 'lib/rjob/context.rb', line 61

def dead_job_count()
  redis { |r| r.llen("#{@prefix}:dead") }
end

#demodularize_class(name) ⇒ Object



107
108
109
110
111
112
113
# File 'lib/rjob/context.rb', line 107

def demodularize_class(name)
  const = Kernel
  name.split('::').each do |n|
    const = const.const_get(n)
  end
  const
end

#enqueue_job(job_class, args) ⇒ Object



86
87
88
# File 'lib/rjob/context.rb', line 86

def enqueue_job(job_class, args)
  redis(&method(:enqueue_job_with_redis).curry[job_class, args])
end

#enqueue_job_with_redis(job_class, args, r) ⇒ Object



90
91
92
93
# File 'lib/rjob/context.rb', line 90

def enqueue_job_with_redis(job_class, args, r)
  job_data = MessagePack.pack([job_class.to_s, args])
  @script_runner.exec(r, :enqueue_job, [], [@prefix, @bucket_count, job_data])
end

#fetch_worker_class(class_name:) ⇒ Object



103
104
105
# File 'lib/rjob/context.rb', line 103

def fetch_worker_class(class_name:)
  demodularize_class(class_name)
end

#get_dead_jobs(count = 1, offset = 0, keyname: 'ohh-dead') ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rjob/context.rb', line 65

def get_dead_jobs(count=1, offset=0, keyname: 'ohh-dead')
  redis do |r|
    # dead_jobs = r.lrange("#{@prefix}:dead", offset, count < 0 ? -1 : (offset + count - 1))
    dead_jobs = r.lrange(keyname, offset, count < 0 ? -1 : (offset + count - 1))
    dead_jobs.map do |error_payload|
      payload = MessagePack.unpack(error_payload)

      {
        job: Rjob::Job.deserialize(self, payload['job']),
        when: Time.at(payload['when']),
        error_class: payload['error_class'],
        full_message: payload['message'],
      }
    end
  end
end

#redis(&block) ⇒ Object



82
83
84
# File 'lib/rjob/context.rb', line 82

def redis(&block)
  @pool.with(&block)
end

#schedule_job_at(timestamp, job_class, args) ⇒ Object



95
96
97
98
99
100
101
# File 'lib/rjob/context.rb', line 95

def schedule_job_at(timestamp, job_class, args)
  job_data = MessagePack.pack([job_class.to_s, args])

  redis do |r|
    @script_runner.exec(r, :schedule_job_at, [], [timestamp.to_s, job_data, @prefix, @bucket_count])
  end
end