Class: Beanstalk::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/beanstalk-client/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(addrs, default_tube = nil) ⇒ Pool

Returns a new instance of Pool.



239
240
241
242
243
244
245
# File 'lib/beanstalk-client/connection.rb', line 239

def initialize(addrs, default_tube=nil)
  @addrs = addrs
  @watch_list = ['default']
  @default_tube=default_tube
  @watch_list = [default_tube] if default_tube
  connect()
end

Instance Attribute Details

#last_connObject

Returns the value of attribute last_conn.



237
238
239
# File 'lib/beanstalk-client/connection.rb', line 237

def last_conn
  @last_conn
end

Instance Method Details

#closeObject

Close all open connections for this pool



352
353
354
355
356
357
358
359
# File 'lib/beanstalk-client/connection.rb', line 352

def close
  while @connections.size > 0
    addr = @connections.keys.last
    conn = @connections[addr]
    @connections.delete(addr)
    conn.close
  end
end

#connectObject



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/beanstalk-client/connection.rb', line 247

def connect()
  @connections ||= {}
  @addrs.each do |addr|
    begin
      if !@connections.include?(addr)
        @connections[addr] = Connection.new(addr, @default_tube)
        prev_watched = @connections[addr].list_tubes_watched()
        to_ignore = prev_watched - @watch_list
        @watch_list.each{|tube| @connections[addr].watch(tube)}
        to_ignore.each{|tube| @connections[addr].ignore(tube)}
      end
    rescue Errno::ECONNREFUSED
      raise NotConnected
    rescue Exception => ex
      puts "#{ex.class}: #{ex}"
    end
  end
  @connections.size
end

#ignore(tube) ⇒ Object



311
312
313
314
315
# File 'lib/beanstalk-client/connection.rb', line 311

def ignore(tube)
  r = send_to_all_conns(:ignore, tube)
  @watch_list = send_to_rand_conn(:list_tubes_watched, true)
  return r
end

#last_serverObject



271
272
273
# File 'lib/beanstalk-client/connection.rb', line 271

def last_server
  @last_conn.addr
end

#list_tube_usedObject



337
338
339
# File 'lib/beanstalk-client/connection.rb', line 337

def list_tube_used()
  send_to_all_conns(:list_tube_used)
end

#list_tubesObject



333
334
335
# File 'lib/beanstalk-client/connection.rb', line 333

def list_tubes()
  send_to_all_conns(:list_tubes)
end

#list_tubes_watched(*args) ⇒ Object



341
342
343
# File 'lib/beanstalk-client/connection.rb', line 341

def list_tubes_watched(*args)
  send_to_all_conns(:list_tubes_watched, *args)
end

#open_connectionsObject



267
268
269
# File 'lib/beanstalk-client/connection.rb', line 267

def open_connections()
  @connections.values()
end

#peek_buriedObject



369
370
371
# File 'lib/beanstalk-client/connection.rb', line 369

def peek_buried()
  send_to_each_conn_first_res(:peek_buried)
end

#peek_delayedObject



365
366
367
# File 'lib/beanstalk-client/connection.rb', line 365

def peek_delayed()
  send_to_each_conn_first_res(:peek_delayed)
end

#peek_job(id) ⇒ Object



373
374
375
# File 'lib/beanstalk-client/connection.rb', line 373

def peek_job(id)
  make_hash(send_to_all_conns(:peek_job, id))
end

#peek_readyObject



361
362
363
# File 'lib/beanstalk-client/connection.rb', line 361

def peek_ready()
  send_to_each_conn_first_res(:peek_ready)
end

#put(body, pri = 65536, delay = 0, ttr = 120) ⇒ Object

Put a job on the queue.

Parameters:

  • body: the payload of the job (use Beanstalk::Pool#yput / Beanstalk::Job#ybody to automatically serialize your payload with YAML)

  • pri: priority. Default 65536 (higher numbers are higher priority)

  • delay: how long to wait until making the job available for reservation

  • ttr: time in seconds for the job to reappear on the queue (if beanstalk doesn’t hear from a consumer within this time, assume the consumer died and make the job available for someone else to process). Default 120 seconds.



283
284
285
# File 'lib/beanstalk-client/connection.rb', line 283

def put(body, pri=65536, delay=0, ttr=120)
  send_to_rand_conn(:put, body, pri, delay, ttr)
end

#raw_statsObject



317
318
319
# File 'lib/beanstalk-client/connection.rb', line 317

def raw_stats()
  send_to_all_conns(:stats)
end

#raw_stats_tube(tube) ⇒ Object



325
326
327
# File 'lib/beanstalk-client/connection.rb', line 325

def raw_stats_tube(tube)
  send_to_all_conns(:stats_tube, tube)
end

#remove(conn) ⇒ Object



345
346
347
348
349
# File 'lib/beanstalk-client/connection.rb', line 345

def remove(conn)
  connection = @connections.delete(conn.addr)
  connection.close if connection
  connection
end

#reserve(timeout = nil) ⇒ Object

Reserve a job from the queue.

Parameters

  • timeout - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely.



297
298
299
# File 'lib/beanstalk-client/connection.rb', line 297

def reserve(timeout=nil)
  send_to_rand_conn(:reserve, timeout)
end

#statsObject



321
322
323
# File 'lib/beanstalk-client/connection.rb', line 321

def stats()
  sum_hashes(raw_stats.values)
end

#stats_tube(tube) ⇒ Object



329
330
331
# File 'lib/beanstalk-client/connection.rb', line 329

def stats_tube(tube)
  sum_hashes(raw_stats_tube(tube).values)
end

#use(tube) ⇒ Object



301
302
303
# File 'lib/beanstalk-client/connection.rb', line 301

def use(tube)
  send_to_all_conns(:use, tube)
end

#watch(tube) ⇒ Object



305
306
307
308
309
# File 'lib/beanstalk-client/connection.rb', line 305

def watch(tube)
  r = send_to_all_conns(:watch, tube)
  @watch_list = send_to_rand_conn(:list_tubes_watched, true)
  return r
end

#yput(obj, pri = 65536, delay = 0, ttr = 120) ⇒ Object

Like put, but serialize the object with YAML.



288
289
290
# File 'lib/beanstalk-client/connection.rb', line 288

def yput(obj, pri=65536, delay=0, ttr=120)
  send_to_rand_conn(:yput, obj, pri, delay, ttr)
end