Class: Messed::Queue::Beanstalk
Instance Attribute Summary collapse
Instance Method Summary
collapse
included, #logger, #logger=
Constructor Details
#initialize(tube, host = '127.0.0.1', port = 11300) ⇒ Beanstalk
Returns a new instance of Beanstalk.
10
11
12
13
14
15
|
# File 'lib/messed/queue/beanstalk.rb', line 10
def initialize(tube, host = '127.0.0.1', port = 11300)
@tube, @host, @port = tube, host, port
@beanstalk = ::Beanstalk::Pool.new(Array("#{host}:#{port}"))
@beanstalk.use(tube)
@beanstalk.watch(tube)
end
|
Instance Attribute Details
#application ⇒ Object
Returns the value of attribute application.
7
8
9
|
# File 'lib/messed/queue/beanstalk.rb', line 7
def application
@application
end
|
#tube ⇒ Object
Returns the value of attribute tube.
8
9
10
|
# File 'lib/messed/queue/beanstalk.rb', line 8
def tube
@tube
end
|
Instance Method Details
#<<(message) ⇒ Object
34
35
36
|
# File 'lib/messed/queue/beanstalk.rb', line 34
def <<(message)
beanstalk.put message.to_json
end
|
#drain! ⇒ Object
42
43
44
45
46
|
# File 'lib/messed/queue/beanstalk.rb', line 42
def drain!
while jobs_available?
beanstalk.reserve.delete
end
end
|
#jobs_available ⇒ Object
48
49
50
|
# File 'lib/messed/queue/beanstalk.rb', line 48
def jobs_available
beanstalk.stats_tube(tube)['current-jobs-ready']
end
|
#jobs_available? ⇒ Boolean
38
39
40
|
# File 'lib/messed/queue/beanstalk.rb', line 38
def jobs_available?
not jobs_available.zero?
end
|
#status ⇒ Object
17
18
19
|
# File 'lib/messed/queue/beanstalk.rb', line 17
def status
@beanstalk.stats_tube(tube)
end
|
#take(block = true) ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/messed/queue/beanstalk.rb', line 21
def take(block = true)
job = beanstalk.reserve
begin
message = application.message_class.from_json(job.body)
rescue JSON::ParserError
logger.error "malformed message #{job.body}"
job.delete
else
yield message
job.delete
end
end
|