Module: Resque::MockExt

Defined in:
lib/resque/mock.rb

Instance Method Summary collapse

Instance Method Details

#add_job(data) ⇒ Object



82
83
84
# File 'lib/resque/mock.rb', line 82

def add_job(data)
  @worker_manager[:jobs] << data
end

#asyncObject



9
10
11
12
13
14
15
16
# File 'lib/resque/mock.rb', line 9

def async
  @async = true
  create_worker_manager
  yield
ensure
  wait_for_worker_manager
  @async = false
end

#create_worker_managerObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/resque/mock.rb', line 39

def create_worker_manager
  @worker_manager = Thread.new do
    Thread.current.abort_on_exception = true
    worker_threads = []

    while true
      break if Thread.current[:exit] && worker_threads.empty? && Thread.current[:jobs].empty?

      worker_threads.reject! {|t| !t.alive? }

      while Thread.current[:jobs] && job_data = Thread.current[:jobs].shift
        worker_threads << create_worker_thread_for(job_data)
      end

      sleep 0.5
    end
  end.tap {|t| t[:jobs] = [] }
end

#create_worker_thread_for(data) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/resque/mock.rb', line 64

def create_worker_thread_for(data)
  Thread.new(data) do |data|
    Thread.current.abort_on_exception = true
    if delay = data['delay']
      sleep delay
    end

    klass = data['payload']['class']
    puts "Mock perform: #{klass}.perform(*#{data['payload']['args'].inspect})" if ENV['VERBOSE']
    klass.perform(*roundtrip(data['payload']['args']))
    puts "Mock exit: #{klass}.perform(*#{data['payload']['args'].inspect})" if ENV['VERBOSE']
  end
end

#defer(klass, args, delay = nil) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/resque/mock.rb', line 28

def defer(klass, args, delay = nil)
  validate(klass)

  if @async
    add_job('payload' => { 'class' => klass, 'args' => args }, 'delay' => delay)
  else
    sleep delay if delay
    klass.perform(*roundtrip(args))
  end
end

#enqueue(klass, *args) ⇒ Object



18
19
20
21
# File 'lib/resque/mock.rb', line 18

def enqueue(klass, *args)
  puts "Mock enqueue: async=#{!!@async}, stack_depth=#{caller.size}, #{klass}, #{args.inspect}" if ENV['VERBOSE']
  defer(klass, args)
end

#enqueue_in(delay, klass, *args) ⇒ Object



23
24
25
26
# File 'lib/resque/mock.rb', line 23

def enqueue_in(delay, klass, *args)
  puts "Mock enqueue in #{delay}: async=#{!!@async}, stack_depth=#{caller.size}, #{klass}, #{args.inspect}" if ENV['VERBOSE']
  defer(klass, args, delay)
end

#roundtrip(args) ⇒ Object



78
79
80
# File 'lib/resque/mock.rb', line 78

def roundtrip(args)
  decode(encode(args))
end

#wait_for_worker_managerObject



58
59
60
61
62
# File 'lib/resque/mock.rb', line 58

def wait_for_worker_manager
  @worker_manager[:exit] = true
  @worker_manager.join
  @worker_manager = nil
end