Class: AsyncObserver::Queue
- Inherits:
-
Object
- Object
- AsyncObserver::Queue
- Defined in:
- lib/async_observer/queue.rb
Constant Summary collapse
- DEFAULT_PRI =
512
- DEFAULT_FUZZ =
0
- DEFAULT_DELAY =
0
- DEFAULT_TTR =
120
- DEFAULT_TUBE =
'default'
- SUBMIT_OPTS =
[:pri, :fuzz, :delay, :ttr, :tube]
Class Attribute Summary collapse
-
.after_put ⇒ Object
Returns the value of attribute after_put.
-
.queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
-
.gen(obj, selector, args) ⇒ Object
Be careful not to pass in a selector that’s not valid ruby source.
- .gen_args(args) ⇒ Object
- .pkg(code, opts) ⇒ Object
- .put!(obj, pri = DEFAULT_PRI, delay = DEFAULT_DELAY, ttr = DEFAULT_TTR, tube = DEFAULT_TUBE) ⇒ Object
- .put_call!(obj, sel, opts, args = []) ⇒ Object
-
.sync_run(obj) ⇒ Object
This runs jobs synchronously; it’s used when no queue is configured.
-
.sync_worker ⇒ Object
This is a fake worker instance for running jobs synchronously.
Class Attribute Details
.after_put ⇒ Object
Returns the value of attribute after_put.
32 33 34 |
# File 'lib/async_observer/queue.rb', line 32 def after_put @after_put end |
.queue ⇒ Object
Returns the value of attribute queue.
32 33 34 |
# File 'lib/async_observer/queue.rb', line 32 def queue @queue end |
Class Method Details
.gen(obj, selector, args) ⇒ Object
Be careful not to pass in a selector that’s not valid ruby source.
89 90 91 |
# File 'lib/async_observer/queue.rb', line 89 def gen(obj, selector, args) obj.rrepr + '.' + selector.to_s + '(' + gen_args(args) + ')' end |
.gen_args(args) ⇒ Object
93 94 95 |
# File 'lib/async_observer/queue.rb', line 93 def gen_args(args) args.rrepr[1...-1] end |
.pkg(code, opts) ⇒ Object
84 85 86 |
# File 'lib/async_observer/queue.rb', line 84 def pkg(code, opts) opts.merge(:type => :rails, :code => code) end |
.put!(obj, pri = DEFAULT_PRI, delay = DEFAULT_DELAY, ttr = DEFAULT_TTR, tube = DEFAULT_TUBE) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/async_observer/queue.rb', line 49 def put!(obj, pri=DEFAULT_PRI, delay=DEFAULT_DELAY, ttr=DEFAULT_TTR, tube=DEFAULT_TUBE) return sync_run(obj) if pri == :direct || !queue queue.connect() queue.use(tube) info = [queue.yput(obj, pri, delay, ttr), queue.last_server] f = AsyncObserver::Queue.after_put f.call(*info) if f return info end |
.put_call!(obj, sel, opts, args = []) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/async_observer/queue.rb', line 62 def put_call!(obj, sel, opts, args=[]) pri = opts.fetch(:pri, DEFAULT_PRI) fuzz = opts.fetch(:fuzz, DEFAULT_FUZZ) delay = opts.fetch(:delay, DEFAULT_DELAY) ttr = opts.fetch(:ttr, DEFAULT_TTR) tube = opts.fetch(:tube, DEFAULT_TUBE) worker_opts = opts.reject{|k,v| SUBMIT_OPTS.include?(k)} interpolator = opts.fetch(:interpolator, nil) pri = pri + rand(fuzz + 1) if pri != :direct if interpolator code = packed = interpolator else code = gen(obj, sel, args) packed = pkg(code, worker_opts) end RAILS_DEFAULT_LOGGER.info("put #{pri} #{code} to #{tube}") put!(packed, pri, delay, ttr, tube) end |
.sync_run(obj) ⇒ Object
This runs jobs synchronously; it’s used when no queue is configured.
41 42 43 44 45 46 47 |
# File 'lib/async_observer/queue.rb', line 41 def sync_run(obj) body = YAML.dump(obj) job = Beanstalk::Job.new(AsyncObserver::FakeConn.new(), 0, body) sync_worker.dispatch(job) sync_worker.do_all_work() return 0, '0.0.0.0' end |
.sync_worker ⇒ Object
This is a fake worker instance for running jobs synchronously.
35 36 37 38 |
# File 'lib/async_observer/queue.rb', line 35 def sync_worker() require 'async_observer/worker' @sync_worker ||= AsyncObserver::Worker.new(binding) end |