Module: Resque::MockExt
- Defined in:
- lib/resque/mock.rb
Instance Method Summary collapse
- #add_job(data) ⇒ Object
- #async ⇒ Object
- #create_worker_manager ⇒ Object
- #create_worker_thread_for(data) ⇒ Object
- #defer(klass, args, delay = nil) ⇒ Object
- #enqueue(klass, *args) ⇒ Object
- #enqueue_in(delay, klass, *args) ⇒ Object
- #roundtrip(args) ⇒ Object
- #wait_for_worker_manager ⇒ Object
Instance Method Details
#add_job(data) ⇒ Object
82 83 84 |
# File 'lib/resque/mock.rb', line 82 def add_job(data) @worker_manager[:jobs] << data end |
#async ⇒ Object
9 10 11 12 13 14 15 16 |
# File 'lib/resque/mock.rb', line 9 def async @async = true create_worker_manager yield ensure wait_for_worker_manager @async = false end |
#create_worker_manager ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/resque/mock.rb', line 39 def create_worker_manager @worker_manager = Thread.new do Thread.current.abort_on_exception = true worker_threads = [] while true break if Thread.current[:exit] && worker_threads.empty? && Thread.current[:jobs].empty? worker_threads.reject! {|t| !t.alive? } while Thread.current[:jobs] && job_data = Thread.current[:jobs].shift worker_threads << create_worker_thread_for(job_data) end sleep 0.5 end end.tap {|t| t[:jobs] = [] } end |
#create_worker_thread_for(data) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/resque/mock.rb', line 64 def create_worker_thread_for(data) Thread.new(data) do |data| Thread.current.abort_on_exception = true if delay = data['delay'] sleep delay end klass = data['payload']['class'] puts "Mock perform: #{klass}.perform(*#{data['payload']['args'].inspect})" if ENV['VERBOSE'] klass.perform(*roundtrip(data['payload']['args'])) puts "Mock exit: #{klass}.perform(*#{data['payload']['args'].inspect})" if ENV['VERBOSE'] end end |
#defer(klass, args, delay = nil) ⇒ Object
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/resque/mock.rb', line 28 def defer(klass, args, delay = nil) validate(klass) if @async add_job('payload' => { 'class' => klass, 'args' => args }, 'delay' => delay) else sleep delay if delay klass.perform(*roundtrip(args)) end end |
#enqueue(klass, *args) ⇒ Object
18 19 20 21 |
# File 'lib/resque/mock.rb', line 18 def enqueue(klass, *args) puts "Mock enqueue: async=#{!!@async}, stack_depth=#{caller.size}, #{klass}, #{args.inspect}" if ENV['VERBOSE'] defer(klass, args) end |
#enqueue_in(delay, klass, *args) ⇒ Object
23 24 25 26 |
# File 'lib/resque/mock.rb', line 23 def enqueue_in(delay, klass, *args) puts "Mock enqueue in #{delay}: async=#{!!@async}, stack_depth=#{caller.size}, #{klass}, #{args.inspect}" if ENV['VERBOSE'] defer(klass, args, delay) end |
#roundtrip(args) ⇒ Object
78 79 80 |
# File 'lib/resque/mock.rb', line 78 def roundtrip(args) decode(encode(args)) end |
#wait_for_worker_manager ⇒ Object
58 59 60 61 62 |
# File 'lib/resque/mock.rb', line 58 def wait_for_worker_manager @worker_manager[:exit] = true @worker_manager.join @worker_manager = nil end |