Module: SideJob

Defined in:
lib/sidejob/testing.rb,
lib/sidejob.rb,
lib/sidejob/job.rb,
lib/sidejob/port.rb,
lib/sidejob/worker.rb,
lib/sidejob/version.rb,
lib/sidejob/server_middleware.rb

Overview

helpers for testing

Defined Under Namespace

Modules: JobMethods, Worker Classes: Job, Port, ServerMiddleware

Constant Summary collapse

CONFIGURATION =

Configuration parameters

{
    lock_expiration: 60, # workers should not run longer than this number of seconds
    max_runs_per_minute: 600, # terminate jobs that run too often
}
VERSION =

The current SideJob version

'4.1.2'

Class Method Summary collapse

Class Method Details

.context(data, &block) ⇒ Object

Adds to the current SideJob context within the block.

Parameters:

  • data (Hash)

    Data to be merged into the current context



119
120
121
122
123
124
125
# File 'lib/sidejob.rb', line 119

def self.context(data, &block)
  previous = Thread.current[:sidejob_context]
  Thread.current[:sidejob_context] = (previous || {}).merge(data.symbolize_keys)
  yield
ensure
  Thread.current[:sidejob_context] = previous
end

.find(name_or_id) ⇒ SideJob::Job?

Finds a job by name or id.

Parameters:

  • name_or_id (String, Integer)

    Job name or id

Returns:

  • (SideJob::Job, nil)

    Job object or nil if it doesn’t exist



85
86
87
# File 'lib/sidejob.rb', line 85

def self.find(name_or_id)
  SideJob::Job.new(name_or_id) rescue nil
end

.log(entry) ⇒ Object

Publishes a log message using the current SideJob context.

Parameters:

  • entry (Hash|Exception|String)

    Log entry



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/sidejob.rb', line 97

def self.log(entry)
  context = (Thread.current[:sidejob_context] || {}).merge(timestamp: SideJob.timestamp)

  if entry.is_a?(Exception)
    exception = entry
    entry = { error: exception.message }
    if exception.backtrace
      # only store the backtrace until the first sidekiq line
      entry[:backtrace] = exception.backtrace.take_while {|l| l !~ /sidekiq/}.join("\n")
    end
  elsif entry.is_a?(String)
    entry = { message: entry }
  end

  # Disable logging to prevent infinite publish loop for input ports subscribed to /sidejob/log which could generate log entries
  SideJob::Port.group(log: false) do
    SideJob.publish '/sidejob/log', context.merge(entry)
  end
end

.publish(channel, message) ⇒ Object

Publishes a message up the channel hierarchy to jobs by writing to ports subscribed to the channel. Also publishes to the destination channel only via normal redis pubsub.

Parameters:

  • channel (String)

    Channel is path-like, separated by / to indicate hierarchy

  • message (Object)

    JSON encodable message



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/sidejob.rb', line 131

def self.publish(channel, message)
  # We don't publish at every level up hierarchy via redis pubsub since a client can use redis psubscribe
  SideJob.redis.publish channel, message.to_json

  job_subs = {}

  # Set the context to the original channel so that a job that subscribes to a higher channel can determine
  # the original channel that the message was sent to.
  SideJob.context({channel: channel}) do
    # walk up the channel hierarchy
    Pathname.new(channel).ascend do |channel|
      channel = channel.to_s
      jobs = SideJob.redis.smembers "channel:#{channel}"
      jobs.each do |id|
        job = SideJob.find(id)
        if ! job_subs.has_key?(id)
          job_subs[id] = {}
          if job
            SideJob.redis.hgetall("#{job.redis_key}:inports:channels").each_pair do |port, channels|
              channels = JSON.parse(channels)
              channels.each do |ch|
                job_subs[id][ch] ||= []
                job_subs[id][ch] << port
              end
            end
          end
        end

        if job && job_subs[id] && job_subs[id][channel]
          job_subs[id][channel].each do |port|
            job.input(port).write message
          end
        else
          # Job is gone or no longer subscribed to this channel
          SideJob.redis.srem "channel:#{channel}", id
        end
      end
    end
  end
end

.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil) ⇒ SideJob::Job

Main function to queue a job

Parameters:

  • queue (String)

    Name of the queue to put the job in

  • klass (String)

    Name of the class that will handle the job

  • args (Array) (defaults to: nil)

    additional args to pass to the worker’s perform method (default none)

  • as (String) (defaults to: nil)

    Add as alias to job or if name already taken, does not queue new job and returns nil

  • parent (SideJob::Job) (defaults to: nil)

    parent job

  • name (String) (defaults to: nil)

    Name of child job (required if parent specified)

  • at (Time, Float) (defaults to: nil)

    Time to schedule the job, otherwise queue immediately

  • by (String) (defaults to: nil)

    Who created this job. Recommend <type>:<id> format for non-jobs as SideJob uses job:<id>.

  • inports (Hash{Symbol,String => Hash}) (defaults to: nil)

    Input port configuration. Port name to options.

  • outports (Hash{Symbol,String => Hash}) (defaults to: nil)

    Output port configuration. Port name to options.

Returns:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sidejob.rb', line 49

def self.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil)
  raise "No worker registered for #{klass} in queue #{queue}" unless SideJob::Worker.config(queue, klass)

  return nil if as && SideJob.redis.hget('jobs:aliases', as)

  # To prevent race conditions, we generate the id and set all data in redis before queuing the job to sidekiq
  # Otherwise, sidekiq may start the job too quickly
  id = SideJob.redis.incr('jobs:last_id')
  SideJob.redis.sadd 'jobs', id
  job = SideJob::Job.new(id)

  redis_key = job.redis_key
  SideJob.redis.multi do |multi|
    multi.set "#{redis_key}:worker", {queue: queue, class: klass, args: args}.to_json
    multi.set "#{redis_key}:status", 'completed'
    multi.set "#{redis_key}:created_at", SideJob.timestamp
    multi.set "#{redis_key}:created_by", by
  end

  if parent
    raise 'Missing name option for job with a parent' unless name
    parent.adopt(job, name)
  end

  # initialize ports
  job.inports = inports
  job.outports = outports

  job.add_alias(as) if as

  job.run(at: at)
end

.redisObject

Returns redis connection If block is given, yields the redis connection Otherwise, just returns the redis connection



22
23
24
25
26
27
28
29
30
# File 'lib/sidejob.rb', line 22

def self.redis
  Sidekiq.redis do |redis|
    if block_given?
      yield redis
    else
      redis
    end
  end
end

.redis=(redis) ⇒ Object

Parameters:

  • redis (Hash)

    Options for passing to Redis.new



33
34
35
# File 'lib/sidejob.rb', line 33

def self.redis=(redis)
  Sidekiq.redis = redis
end

.timestampString

Returns the current timestamp as a iso8601 string

Returns:

  • (String)

    Current timestamp



91
92
93
# File 'lib/sidejob.rb', line 91

def self.timestamp
  Time.now.utc.iso8601(9)
end