Class: Redact

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/redact.rb

Overview

A distributed, dependency-aware job scheduler for Redis. Like distributed make—you define the dependencies between different parts of your job, and Redact handles the scheduling.

Defined Under Namespace

Classes: CyclicDependencyError, TSortHash

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, opts = {}) ⇒ Redact

Options:

  • namespace: prefix for Redis keys, e.g. “redact/”



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/redact.rb', line 23

def initialize redis, opts={}
  @namespace = opts[:namespace]
  @redis = redis
  @dag = TSortHash.new

  @queue = [@namespace, "q"].join
  @processing_list = [@namespace, "processing"].join
  @done_list = [@namespace, "done"].join
  @dag_key = [@namespace, "dag"].join
   = [@namespace, "metadata"].join
  @params_key = [@namespace, "params"].join
end

Instance Attribute Details

#dagObject (readonly)

Returns the value of attribute dag.



36
37
38
# File 'lib/redact.rb', line 36

def dag
  @dag
end

Instance Method Details

#add_task(what, *deps) ⇒ Object

Add a task with dependencies. What is the name of a task (either a symbol or a string). Deps are any tasks that are dependencies of what. Deps may refer to tasks not already added by #add_task; these will be automatically added without dependencies.

Raises a CyclicDependencyError exception if adding these dependencies would result in a cyclic dependency.

Raises:

  • (ArgumentError)


69
70
71
72
73
74
75
76
77
# File 'lib/redact.rb', line 69

def add_task what, *deps
  deps = deps.flatten # be nice and allow arrays to be passed in
  raise ArgumentError, "expecting dependencies to be zero or more task ids" unless deps.all? { |x| x.is_a?(Symbol) }
  @dag[what] = deps

  @dag.strongly_connected_components.each do |x|
    raise CyclicDependencyError, "cyclic dependency #{x.inspect}" if x.size != 1
  end
end

#do!(target, run_id, run_params = nil) ⇒ Object

Schedules target for completion among worker processes listening with #each. Returns immediately.

Targets scheduled with #do! have their tasks dispatched in generally FIFO order; i.e., work for earlier targets will generally be scheduled before work for later targets. Of course, the actual completion order of targets depends on the completion orders of dependent tasks, the time required for these tasks, etc.

You must call #publish_graph! at least once before calling this.

run_id is the unique identifier for this run. Don’t reuse these.

run_params are parameters that will be passed to all tasks in this run. This value will go through JSON round-trips, so should only contain variable types that are expressible with JSON.

Raises:

  • (ArgumentError)


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/redact.rb', line 100

def do! target, run_id, run_params=nil
  raise ArgumentError, "you haven't called publish_graph!" unless @redis.exists(@dag_key)

  dag = load_dag
  target = target.to_s
  raise ArgumentError, "#{target.inspect} is not a recognized task" unless dag.member?(target)

  @redis.hset @params_key, run_id, run_params.to_json if run_params

  dag.each_strongly_connected_component_from(target) do |tasks|
    task = tasks.first # all single-element arrays by this point
    next unless dag[task].nil? || dag[task].empty? # only push tasks without dependencies
    enqueue_task! task, target, run_id, true
  end
end

#done_list_sizeObject

Returns the total number of completed tasks we have information about.



126
# File 'lib/redact.rb', line 126

def done_list_size; @redis.llen @done_list end

#done_tasks(start_idx = 0, end_idx = -1) ⇒ Object

Returns information representing the set of tasks that have been completed. The return value is a hash that includes keys from #in_progress_tasks, plus these keys: worker_id: the worker_id of the worker processing this task time_waiting: the approximate number of seconds this task was enqueued for ts: the timestamp at the end of processing state: one of “done”, “skipped”, or “error” error, backtrace: debugging information for tasks in state “error” time_processing: the approximate number of seconds this task was processed for



159
160
161
# File 'lib/redact.rb', line 159

def done_tasks start_idx=0, end_idx=-1
  @redis.lrange(@done_list, start_idx, end_idx).map { |t| task_summary_for t }
end

#each(opts = {}) ⇒ Object

Yields tasks from the queue that are ready for execution. Callers should then perform the work for those tasks. Any exceptions thrown will result in the task being reinserted in the queue and tried at a later point (possibly by another process), unless the retry maximum for that task has been exceeded.

This method downloads the task graph as necessary, so live updates of the graph are possible without restarting worker processes.

opts are:

  • blocking: if true, #each will block until items are available (and will never return)

  • block_timeout: when blocking is true, the timeout (in seconds) before stopping. A value of nil or 0 will block forever.

  • retries: how many times an individual job should be retried before resulting in an error state. Default is 2 (so 3 tries total).

  • worker_id: the id of this worker process, for debugging. (If nil, will use a reasonably intelligent default.)



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/redact.rb', line 177

def each opts={}
  worker_id = opts[:worker_id] || [Socket.gethostname, $$, $0].join("-")
  retries = opts[:retries] || 2
  blocking = opts[:blocking]
  block_timeout = opts[:block_timeout] || 0

  while true
    token = if blocking
      @redis.brpoplpush @queue, @processing_list, block_timeout
    else
      @redis.rpoplpush @queue, @processing_list
    end
    break unless token # no more tokens!

    ## decompose the token
    task, target, run_id, insertion_time = parse_token token

    ## record that we've seen this
    set_metadata! task, run_id, worker_id: worker_id, time_waiting: (Time.now - insertion_time).to_i

    ## load the target state. abort if we don't need to do anything
    target_state = get_state target, run_id
    if (target_state == :error) || (target_state == :done)
      #log "skipping #{task}##{run_id} because #{target}##{run_id} is in state #{target_state}"
      set_metadata! task, run_id, state: :skipped
      commit! token
      next
    end

    ## get any run params
    params = @redis.hget @params_key, run_id
    params = JSON.parse(params) if params
    
    ## ok, let's finally try to perform the task
    begin
      #log "performing #{task}##{run_id}"

      ## the task is now in progress
      set_metadata! task, run_id, state: :in_progress

      ## do it
      startt = Time.now
      yield task.to_sym, run_id, params
      elapsed = Time.now - startt

      ## update total running time
      total_time_processing = elapsed + ((task, run_id)[:time_processing] || 0).to_f
      set_metadata! task, run_id, time_processing: total_time_processing

      set_metadata! task, run_id, state: :done
      enqueue_next_tasks! task, target, run_id
      commit! token
    rescue Exception => e
      num_tries = inc_num_tries! task, run_id
      if num_tries > retries # we fail
        set_metadata! target, run_id, state: :error
        set_metadata! task, run_id, state: :error, error: "(#{e.class}) #{e.message}", backtrace: e.backtrace
        commit! token
      else # we'll retry
        uncommit! token
      end

      raise
    end
  end
end

#enqueued_tasks(start_idx = 0, end_idx = -1) ⇒ Object

Returns information representing the set of tasks currently in the queue. The return value is a hash that includes, among other things, these keys: task: the name of the task run_id: the run_id of the task target: the target of the task ts: the timestamp of queue insertion



134
135
136
# File 'lib/redact.rb', line 134

def enqueued_tasks start_idx=0, end_idx=-1
  @redis.lrange(@queue, start_idx, end_idx).map { |t| task_summary_for t }
end

#in_progress_tasks(start_idx = 0, end_idx = -1) ⇒ Object

Returns information representing the set of tasks currently in process by worker processes. The return value is a hash that includes keys from #enqueued_tasks, plus these keys: worker_id: the worker_id of the worker processing this task time_waiting: the approximate number of seconds this task was enqueued for ts: the timestamp at the start of processing



145
146
147
# File 'lib/redact.rb', line 145

def in_progress_tasks start_idx=0, end_idx=-1
  @redis.lrange(@processing_list, start_idx, end_idx).map { |t| task_summary_for t }
end

#num_done_tasksObject



162
# File 'lib/redact.rb', line 162

def num_done_tasks; @redis.llen @done_list end

#num_enqueued_tasksObject



137
# File 'lib/redact.rb', line 137

def num_enqueued_tasks; @redis.llen @queue end

#num_in_progress_tasksObject



148
# File 'lib/redact.rb', line 148

def num_in_progress_tasks; @redis.llen @processing_list end

#processing_list_sizeObject

Returns the total number of outstanding tasks currently being processed.



123
# File 'lib/redact.rb', line 123

def processing_list_size; @redis.llen @processing_list end

#publish_graph!Object

Publish the dependency graph. Must be called at least once before #do!.



80
81
82
# File 'lib/redact.rb', line 80

def publish_graph!
  @redis.set @dag_key, @dag.to_json
end

#reset!Object

Drop all data and reset the planner.



39
40
41
42
43
# File 'lib/redact.rb', line 39

def reset!
  keys = [@queue, @processing_list, @done_list, @dag_key, @params_key]
  keys += @redis.keys("#@metadata_prefix/*")
  keys.each { |k| @redis.del k }
end

#sizeObject

Return the total number of outstanding tasks in the queue. Note that this is only the number of tasks whose dependencies are satisfied (i.e. those only those that are currently ready to be performed). Queue size may fluctuate in both directions as targets are built.



120
# File 'lib/redact.rb', line 120

def size; @redis.llen @queue end

#visualize(stream = $stdout) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/redact.rb', line 45

def visualize stream=$stdout
  sorted = @dag.tsort
  leaves = sorted.select { |k| @dag[k].nil? || @dag[k].empty? }

  pos = {}
  curpos = 0
  leaves.each do |l|
    string = " #{l} "
    pos[l] = curpos
    stream.print string
    curpos += string.length
  end
  stream.puts
end