Class: Droonga::JobPusher::JobQueue
- Inherits:
-
Object
- Object
- Droonga::JobPusher::JobQueue
- Includes:
- Loggable
- Defined in:
- lib/droonga/job_pusher.rb
Instance Method Summary collapse
- #add_worker(worker) ⇒ Object
- #broadcast(message) ⇒ Object
- #close ⇒ Object
-
#initialize(loop) ⇒ JobQueue
constructor
A new instance of JobQueue.
- #push(message) ⇒ Object
Constructor Details
#initialize(loop) ⇒ JobQueue
Returns a new instance of JobQueue.
80 81 82 83 84 85 86 87 |
# File 'lib/droonga/job_pusher.rb', line 80 def initialize(loop) @loop = loop @buffers = [] @ready_workers = [] @workers = [] @many_jobs_report_interval = 100 update_many_jobs_threshold end |
Instance Method Details
#add_worker(worker) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/droonga/job_pusher.rb', line 95 def add_worker(worker) @workers << worker update_many_jobs_threshold worker.on_ready = lambda do |ready_worker| supply_job(ready_worker) end end |
#broadcast(message) ⇒ Object
119 120 121 122 123 |
# File 'lib/droonga/job_pusher.rb', line 119 def broadcast() @workers.each do |worker| worker.write(.to_msgpack) end end |
#close ⇒ Object
89 90 91 92 93 |
# File 'lib/droonga/job_pusher.rb', line 89 def close @workers.each do |worker| worker.close end end |
#push(message) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/droonga/job_pusher.rb', line 103 def push() job = .to_msgpack if @ready_workers.empty? @buffers << job report_statistics_on_push else worker = @ready_workers.shift if @buffers.empty? worker.write(job) else @buffers << job worker.write(@buffers.shift) end end end |