Class: Redact
Overview
A distributed, dependency-aware job scheduler for Redis. Like distributed make—you define the dependencies between different parts of your job, and Redact handles the scheduling.
Defined Under Namespace
Classes: CyclicDependencyError, TSortHash
Instance Attribute Summary collapse
-
#dag ⇒ Object
readonly
Returns the value of attribute dag.
Instance Method Summary collapse
-
#add_task(what, *deps) ⇒ Object
Add a task with dependencies.
-
#do!(target, run_id, run_params = nil) ⇒ Object
Schedules
targetfor completion among worker processes listening with #each. -
#done_list_size ⇒ Object
Returns the total number of completed tasks we have information about.
-
#done_tasks(start_idx = 0, end_idx = -1) ⇒ Object
Returns information representing the set of tasks that have been completed.
-
#each(opts = {}) ⇒ Object
Yields tasks from the queue that are ready for execution.
-
#enqueued_tasks(start_idx = 0, end_idx = -1) ⇒ Object
Returns information representing the set of tasks currently in the queue.
-
#in_progress_tasks(start_idx = 0, end_idx = -1) ⇒ Object
Returns information representing the set of tasks currently in process by worker processes.
-
#initialize(redis, opts = {}) ⇒ Redact
constructor
Options: *
namespace: prefix for Redis keys, e.g. - #num_done_tasks ⇒ Object
- #num_enqueued_tasks ⇒ Object
- #num_in_progress_tasks ⇒ Object
-
#processing_list_size ⇒ Object
Returns the total number of outstanding tasks currently being processed.
-
#publish_graph! ⇒ Object
Publish the dependency graph.
-
#reset! ⇒ Object
Drop all data and reset the planner.
-
#size ⇒ Object
Return the total number of outstanding tasks in the queue.
- #visualize(stream = $stdout) ⇒ Object
Constructor Details
#initialize(redis, opts = {}) ⇒ Redact
Options:
-
namespace: prefix for Redis keys, e.g. “redact/”
23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/redact.rb', line 23 def initialize redis, opts={} @namespace = opts[:namespace] @redis = redis @dag = TSortHash.new @queue = [@namespace, "q"].join @processing_list = [@namespace, "processing"].join @done_list = [@namespace, "done"].join @dag_key = [@namespace, "dag"].join = [@namespace, "metadata"].join @params_key = [@namespace, "params"].join end |
Instance Attribute Details
#dag ⇒ Object (readonly)
Returns the value of attribute dag.
36 37 38 |
# File 'lib/redact.rb', line 36 def dag @dag end |
Instance Method Details
#add_task(what, *deps) ⇒ Object
Add a task with dependencies. What is the name of a task (either a symbol or a string). Deps are any tasks that are dependencies of what. Deps may refer to tasks not already added by #add_task; these will be automatically added without dependencies.
Raises a CyclicDependencyError exception if adding these dependencies would result in a cyclic dependency.
69 70 71 72 73 74 75 76 77 |
# File 'lib/redact.rb', line 69 def add_task what, *deps deps = deps.flatten # be nice and allow arrays to be passed in raise ArgumentError, "expecting dependencies to be zero or more task ids" unless deps.all? { |x| x.is_a?(Symbol) } @dag[what] = deps @dag.strongly_connected_components.each do |x| raise CyclicDependencyError, "cyclic dependency #{x.inspect}" if x.size != 1 end end |
#do!(target, run_id, run_params = nil) ⇒ Object
Schedules target for completion among worker processes listening with #each. Returns immediately.
Targets scheduled with #do! have their tasks dispatched in generally FIFO order; i.e., work for earlier targets will generally be scheduled before work for later targets. Of course, the actual completion order of targets depends on the completion orders of dependent tasks, the time required for these tasks, etc.
You must call #publish_graph! at least once before calling this.
run_id is the unique identifier for this run. Don’t reuse these.
run_params are parameters that will be passed to all tasks in this run. This value will go through JSON round-trips, so should only contain variable types that are expressible with JSON.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/redact.rb', line 100 def do! target, run_id, run_params=nil raise ArgumentError, "you haven't called publish_graph!" unless @redis.exists(@dag_key) dag = load_dag target = target.to_s raise ArgumentError, "#{target.inspect} is not a recognized task" unless dag.member?(target) @redis.hset @params_key, run_id, run_params.to_json if run_params dag.each_strongly_connected_component_from(target) do |tasks| task = tasks.first # all single-element arrays by this point next unless dag[task].nil? || dag[task].empty? # only push tasks without dependencies enqueue_task! task, target, run_id, true end end |
#done_list_size ⇒ Object
Returns the total number of completed tasks we have information about.
126 |
# File 'lib/redact.rb', line 126 def done_list_size; @redis.llen @done_list end |
#done_tasks(start_idx = 0, end_idx = -1) ⇒ Object
Returns information representing the set of tasks that have been completed. The return value is a hash that includes keys from #in_progress_tasks, plus these keys: worker_id: the worker_id of the worker processing this task time_waiting: the approximate number of seconds this task was enqueued for ts: the timestamp at the end of processing state: one of “done”, “skipped”, or “error” error, backtrace: debugging information for tasks in state “error” time_processing: the approximate number of seconds this task was processed for
159 160 161 |
# File 'lib/redact.rb', line 159 def done_tasks start_idx=0, end_idx=-1 @redis.lrange(@done_list, start_idx, end_idx).map { |t| task_summary_for t } end |
#each(opts = {}) ⇒ Object
Yields tasks from the queue that are ready for execution. Callers should then perform the work for those tasks. Any exceptions thrown will result in the task being reinserted in the queue and tried at a later point (possibly by another process), unless the retry maximum for that task has been exceeded.
This method downloads the task graph as necessary, so live updates of the graph are possible without restarting worker processes.
opts are:
-
blocking: if true, #each will block until items are available (and will never return) -
block_timeout: whenblockingis true, the timeout (in seconds) before stopping. A value of nil or 0 will block forever. -
retries: how many times an individual job should be retried before resulting in an error state. Default is 2 (so 3 tries total). -
worker_id: the id of this worker process, for debugging. (If nil, will use a reasonably intelligent default.)
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/redact.rb', line 177 def each opts={} worker_id = opts[:worker_id] || [Socket.gethostname, $$, $0].join("-") retries = opts[:retries] || 2 blocking = opts[:blocking] block_timeout = opts[:block_timeout] || 0 while true token = if blocking @redis.brpoplpush @queue, @processing_list, block_timeout else @redis.rpoplpush @queue, @processing_list end break unless token # no more tokens! ## decompose the token task, target, run_id, insertion_time = parse_token token ## record that we've seen this task, run_id, worker_id: worker_id, time_waiting: (Time.now - insertion_time).to_i ## load the target state. abort if we don't need to do anything target_state = get_state target, run_id if (target_state == :error) || (target_state == :done) #log "skipping #{task}##{run_id} because #{target}##{run_id} is in state #{target_state}" task, run_id, state: :skipped commit! token next end ## get any run params params = @redis.hget @params_key, run_id params = JSON.parse(params) if params ## ok, let's finally try to perform the task begin #log "performing #{task}##{run_id}" ## the task is now in progress task, run_id, state: :in_progress ## do it startt = Time.now yield task.to_sym, run_id, params elapsed = Time.now - startt ## update total running time total_time_processing = elapsed + ((task, run_id)[:time_processing] || 0).to_f task, run_id, time_processing: total_time_processing task, run_id, state: :done enqueue_next_tasks! task, target, run_id commit! token rescue Exception => e num_tries = inc_num_tries! task, run_id if num_tries > retries # we fail target, run_id, state: :error task, run_id, state: :error, error: "(#{e.class}) #{e.message}", backtrace: e.backtrace commit! token else # we'll retry uncommit! token end raise end end end |
#enqueued_tasks(start_idx = 0, end_idx = -1) ⇒ Object
Returns information representing the set of tasks currently in the queue. The return value is a hash that includes, among other things, these keys: task: the name of the task run_id: the run_id of the task target: the target of the task ts: the timestamp of queue insertion
134 135 136 |
# File 'lib/redact.rb', line 134 def enqueued_tasks start_idx=0, end_idx=-1 @redis.lrange(@queue, start_idx, end_idx).map { |t| task_summary_for t } end |
#in_progress_tasks(start_idx = 0, end_idx = -1) ⇒ Object
Returns information representing the set of tasks currently in process by worker processes. The return value is a hash that includes keys from #enqueued_tasks, plus these keys: worker_id: the worker_id of the worker processing this task time_waiting: the approximate number of seconds this task was enqueued for ts: the timestamp at the start of processing
145 146 147 |
# File 'lib/redact.rb', line 145 def in_progress_tasks start_idx=0, end_idx=-1 @redis.lrange(@processing_list, start_idx, end_idx).map { |t| task_summary_for t } end |
#num_done_tasks ⇒ Object
162 |
# File 'lib/redact.rb', line 162 def num_done_tasks; @redis.llen @done_list end |
#num_enqueued_tasks ⇒ Object
137 |
# File 'lib/redact.rb', line 137 def num_enqueued_tasks; @redis.llen @queue end |
#num_in_progress_tasks ⇒ Object
148 |
# File 'lib/redact.rb', line 148 def num_in_progress_tasks; @redis.llen @processing_list end |
#processing_list_size ⇒ Object
Returns the total number of outstanding tasks currently being processed.
123 |
# File 'lib/redact.rb', line 123 def processing_list_size; @redis.llen @processing_list end |
#publish_graph! ⇒ Object
Publish the dependency graph. Must be called at least once before #do!.
80 81 82 |
# File 'lib/redact.rb', line 80 def publish_graph! @redis.set @dag_key, @dag.to_json end |
#reset! ⇒ Object
Drop all data and reset the planner.
39 40 41 42 43 |
# File 'lib/redact.rb', line 39 def reset! keys = [@queue, @processing_list, @done_list, @dag_key, @params_key] keys += @redis.keys("#@metadata_prefix/*") keys.each { |k| @redis.del k } end |
#size ⇒ Object
Return the total number of outstanding tasks in the queue. Note that this is only the number of tasks whose dependencies are satisfied (i.e. those only those that are currently ready to be performed). Queue size may fluctuate in both directions as targets are built.
120 |
# File 'lib/redact.rb', line 120 def size; @redis.llen @queue end |
#visualize(stream = $stdout) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/redact.rb', line 45 def visualize stream=$stdout sorted = @dag.tsort leaves = sorted.select { |k| @dag[k].nil? || @dag[k].empty? } pos = {} curpos = 0 leaves.each do |l| string = " #{l} " pos[l] = curpos stream.print string curpos += string.length end stream.puts end |