Class: Messed::Queue::Beanstalk

Inherits:
Messed::Queue show all
Includes:
Logger::LoggingModule
Defined in:
lib/messed/queue/beanstalk.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger::LoggingModule

included, #logger, #logger=

Constructor Details

#initialize(tube, host = '127.0.0.1', port = 11300) ⇒ Beanstalk

Returns a new instance of Beanstalk.



10
11
12
13
14
15
# File 'lib/messed/queue/beanstalk.rb', line 10

def initialize(tube, host = '127.0.0.1', port = 11300)
  @tube, @host, @port = tube, host, port
  @beanstalk = ::Beanstalk::Pool.new(Array("#{host}:#{port}"))
  @beanstalk.use(tube)
  @beanstalk.watch(tube)
end

Instance Attribute Details

#applicationObject

Returns the value of attribute application.



7
8
9
# File 'lib/messed/queue/beanstalk.rb', line 7

def application
  @application
end

#tubeObject (readonly)

Returns the value of attribute tube.



8
9
10
# File 'lib/messed/queue/beanstalk.rb', line 8

def tube
  @tube
end

Instance Method Details

#<<(message) ⇒ Object



34
35
36
# File 'lib/messed/queue/beanstalk.rb', line 34

def <<(message)
  beanstalk.put message.to_json
end

#drain!Object



42
43
44
45
46
# File 'lib/messed/queue/beanstalk.rb', line 42

def drain!
  while jobs_available?
    beanstalk.reserve.delete
  end
end

#jobs_availableObject



48
49
50
# File 'lib/messed/queue/beanstalk.rb', line 48

def jobs_available
  beanstalk.stats_tube(tube)['current-jobs-ready']
end

#jobs_available?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/messed/queue/beanstalk.rb', line 38

def jobs_available?
  not jobs_available.zero?
end

#statusObject



17
18
19
# File 'lib/messed/queue/beanstalk.rb', line 17

def status
  @beanstalk.stats_tube(tube)
end

#take(block = true) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/messed/queue/beanstalk.rb', line 21

def take(block = true)
  job = beanstalk.reserve
  begin 
    message = application.message_class.from_json(job.body)
  rescue JSON::ParserError
    logger.error "malformed message #{job.body}"
    job.delete
  else
    yield message
    job.delete
  end
end