Class: Droonga::JobPusher::JobQueue

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/job_pusher.rb

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ JobQueue

Returns a new instance of JobQueue.



74
75
76
77
78
79
80
81
# File 'lib/droonga/job_pusher.rb', line 74

def initialize(loop)
  @loop = loop
  @buffers = []
  @ready_workers = []
  @workers = []
  @many_jobs_report_interval = 100
  update_many_jobs_threshold
end

Instance Method Details

#add_worker(worker) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/droonga/job_pusher.rb', line 89

def add_worker(worker)
  @workers << worker
  update_many_jobs_threshold
  worker.on_ready = lambda do |ready_worker|
    supply_job(ready_worker)
  end
end

#broadcast(message) ⇒ Object



113
114
115
116
117
# File 'lib/droonga/job_pusher.rb', line 113

def broadcast(message)
  @workers.each do |worker|
    worker.write(message.to_msgpack)
  end
end

#closeObject



83
84
85
86
87
# File 'lib/droonga/job_pusher.rb', line 83

def close
  @workers.each do |worker|
    worker.close
  end
end

#push(message) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/droonga/job_pusher.rb', line 97

def push(message)
  job = message.to_msgpack
  if @ready_workers.empty?
    @buffers << job
    report_statistics_on_push
  else
    worker = @ready_workers.shift
    if @buffers.empty?
      worker.write(job)
    else
      @buffers << job
      worker.write(@buffers.shift)
    end
  end
end