Class: Aws::Xray::Worker
- Inherits:
-
Object
- Object
- Aws::Xray::Worker
- Defined in:
- lib/aws/xray/worker.rb
Defined Under Namespace
Classes: Configuration
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(queue) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
Constructor Details
#initialize(queue) ⇒ Worker
Returns a new instance of Worker.
46 47 48 |
# File 'lib/aws/xray/worker.rb', line 46 def initialize(queue) @queue = queue end |
Class Method Details
.post(payload) ⇒ Object
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/aws/xray/worker.rb', line 18 def post(payload) Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post received a job") @post_lock.synchronize do refresh_if_forked @queue.push(payload) end Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post pushed a job") rescue ThreadError => e raise QueueIsFullError.new(e) end |
.reset(config) ⇒ Object
30 31 32 33 34 |
# File 'lib/aws/xray/worker.rb', line 30 def reset(config) @queue = Thread::SizedQueue.new(config.max_queue_size) @workers.each(&:kill) if defined?(@workers) && !@workers.empty? @workers = Array.new(config.num) { new(@queue).run } end |
Instance Method Details
#run ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/aws/xray/worker.rb', line 50 def run th = Thread.new(@queue) do |queue| loop do Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run waits a job") payload = queue.pop Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received a job") if payload Client.send_payload(payload.to_s) Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run sent a payload") else Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received invalid item, ignored it") end end end th.abort_on_exception = true th end |