Module: SideJob
- Defined in:
- lib/sidejob/testing.rb,
lib/sidejob.rb,
lib/sidejob/job.rb,
lib/sidejob/port.rb,
lib/sidejob/worker.rb,
lib/sidejob/version.rb,
lib/sidejob/server_middleware.rb
Overview
helpers for testing
Defined Under Namespace
Modules: JobMethods, Worker Classes: Job, Port, ServerMiddleware
Constant Summary collapse
- CONFIGURATION =
Configuration parameters
{ lock_expiration: 60, # workers should not run longer than this number of seconds max_runs_per_minute: 600, # terminate jobs that run too often }
- VERSION =
The current SideJob version
'4.1.2'
Class Method Summary collapse
-
.context(data, &block) ⇒ Object
Adds to the current SideJob context within the block.
-
.find(name_or_id) ⇒ SideJob::Job?
Finds a job by name or id.
-
.log(entry) ⇒ Object
Publishes a log message using the current SideJob context.
-
.publish(channel, message) ⇒ Object
Publishes a message up the channel hierarchy to jobs by writing to ports subscribed to the channel.
-
.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil) ⇒ SideJob::Job
Main function to queue a job.
-
.redis ⇒ Object
Returns redis connection If block is given, yields the redis connection Otherwise, just returns the redis connection.
- .redis=(redis) ⇒ Object
-
.timestamp ⇒ String
Returns the current timestamp as a iso8601 string.
Class Method Details
.context(data, &block) ⇒ Object
Adds to the current SideJob context within the block.
119 120 121 122 123 124 125 |
# File 'lib/sidejob.rb', line 119 def self.context(data, &block) previous = Thread.current[:sidejob_context] Thread.current[:sidejob_context] = (previous || {}).merge(data.symbolize_keys) yield ensure Thread.current[:sidejob_context] = previous end |
.find(name_or_id) ⇒ SideJob::Job?
Finds a job by name or id.
85 86 87 |
# File 'lib/sidejob.rb', line 85 def self.find(name_or_id) SideJob::Job.new(name_or_id) rescue nil end |
.log(entry) ⇒ Object
Publishes a log message using the current SideJob context.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/sidejob.rb', line 97 def self.log(entry) context = (Thread.current[:sidejob_context] || {}).merge(timestamp: SideJob.) if entry.is_a?(Exception) exception = entry entry = { error: exception. } if exception.backtrace # only store the backtrace until the first sidekiq line entry[:backtrace] = exception.backtrace.take_while {|l| l !~ /sidekiq/}.join("\n") end elsif entry.is_a?(String) entry = { message: entry } end # Disable logging to prevent infinite publish loop for input ports subscribed to /sidejob/log which could generate log entries SideJob::Port.group(log: false) do SideJob.publish '/sidejob/log', context.merge(entry) end end |
.publish(channel, message) ⇒ Object
Publishes a message up the channel hierarchy to jobs by writing to ports subscribed to the channel. Also publishes to the destination channel only via normal redis pubsub.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/sidejob.rb', line 131 def self.publish(channel, ) # We don't publish at every level up hierarchy via redis pubsub since a client can use redis psubscribe SideJob.redis.publish channel, .to_json job_subs = {} # Set the context to the original channel so that a job that subscribes to a higher channel can determine # the original channel that the message was sent to. SideJob.context({channel: channel}) do # walk up the channel hierarchy Pathname.new(channel).ascend do |channel| channel = channel.to_s jobs = SideJob.redis.smembers "channel:#{channel}" jobs.each do |id| job = SideJob.find(id) if ! job_subs.has_key?(id) job_subs[id] = {} if job SideJob.redis.hgetall("#{job.redis_key}:inports:channels").each_pair do |port, channels| channels = JSON.parse(channels) channels.each do |ch| job_subs[id][ch] ||= [] job_subs[id][ch] << port end end end end if job && job_subs[id] && job_subs[id][channel] job_subs[id][channel].each do |port| job.input(port).write end else # Job is gone or no longer subscribed to this channel SideJob.redis.srem "channel:#{channel}", id end end end end end |
.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil) ⇒ SideJob::Job
Main function to queue a job
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/sidejob.rb', line 49 def self.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil) raise "No worker registered for #{klass} in queue #{queue}" unless SideJob::Worker.config(queue, klass) return nil if as && SideJob.redis.hget('jobs:aliases', as) # To prevent race conditions, we generate the id and set all data in redis before queuing the job to sidekiq # Otherwise, sidekiq may start the job too quickly id = SideJob.redis.incr('jobs:last_id') SideJob.redis.sadd 'jobs', id job = SideJob::Job.new(id) redis_key = job.redis_key SideJob.redis.multi do |multi| multi.set "#{redis_key}:worker", {queue: queue, class: klass, args: args}.to_json multi.set "#{redis_key}:status", 'completed' multi.set "#{redis_key}:created_at", SideJob. multi.set "#{redis_key}:created_by", by end if parent raise 'Missing name option for job with a parent' unless name parent.adopt(job, name) end # initialize ports job.inports = inports job.outports = outports job.add_alias(as) if as job.run(at: at) end |
.redis ⇒ Object
Returns redis connection If block is given, yields the redis connection Otherwise, just returns the redis connection
22 23 24 25 26 27 28 29 30 |
# File 'lib/sidejob.rb', line 22 def self.redis Sidekiq.redis do |redis| if block_given? yield redis else redis end end end |
.redis=(redis) ⇒ Object
33 34 35 |
# File 'lib/sidejob.rb', line 33 def self.redis=(redis) Sidekiq.redis = redis end |
.timestamp ⇒ String
Returns the current timestamp as a iso8601 string
91 92 93 |
# File 'lib/sidejob.rb', line 91 def self. Time.now.utc.iso8601(9) end |