Class: Droonga::JobPusher::JobQueue

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/job_pusher.rb

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ JobQueue

Returns a new instance of JobQueue.



80
81
82
83
84
85
86
87
# File 'lib/droonga/job_pusher.rb', line 80

def initialize(loop)
  @loop = loop
  @buffers = []
  @ready_workers = []
  @workers = []
  @many_jobs_report_interval = 100
  update_many_jobs_threshold
end

Instance Method Details

#add_worker(worker) ⇒ Object



95
96
97
98
99
100
101
# File 'lib/droonga/job_pusher.rb', line 95

def add_worker(worker)
  @workers << worker
  update_many_jobs_threshold
  worker.on_ready = lambda do |ready_worker|
    supply_job(ready_worker)
  end
end

#broadcast(message) ⇒ Object



119
120
121
122
123
# File 'lib/droonga/job_pusher.rb', line 119

def broadcast(message)
  @workers.each do |worker|
    worker.write(message.to_msgpack)
  end
end

#closeObject



89
90
91
92
93
# File 'lib/droonga/job_pusher.rb', line 89

def close
  @workers.each do |worker|
    worker.close
  end
end

#push(message) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/droonga/job_pusher.rb', line 103

def push(message)
  job = message.to_msgpack
  if @ready_workers.empty?
    @buffers << job
    report_statistics_on_push
  else
    worker = @ready_workers.shift
    if @buffers.empty?
      worker.write(job)
    else
      @buffers << job
      worker.write(@buffers.shift)
    end
  end
end