Module: DRbQueue
- Extended by:
- DRbQueue, Forwardable
- Included in:
- DRbQueue
- Defined in:
- lib/drb_queue.rb,
lib/drb_queue/store.rb,
lib/drb_queue/server.rb,
lib/drb_queue/version.rb,
lib/drb_queue/store/redis.rb,
lib/drb_queue/configuration.rb
Defined Under Namespace
Classes: Configuration, Server, Store
Constant Summary
collapse
- ConfiguredAfterStarted =
Class.new(StandardError)
- VERSION =
"0.0.2"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Instance Attribute Details
#started ⇒ Object
Also known as:
started?
Returns the value of attribute started.
19
20
21
|
# File 'lib/drb_queue.rb', line 19
def started
@started
end
|
Instance Method Details
49
50
51
52
53
|
# File 'lib/drb_queue.rb', line 49
def configure
raise ConfiguredAfterStarted, "You must configure #{self.name} BEFORE starting the server" if started?
synchronize { yield configuration }
end
|
#connect_client! ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/drb_queue.rb', line 77
def connect_client!
synchronize do
tries = 0
begin
@server = DRbObject.new_with_uri(server_uri)
@server.ping
rescue DRb::DRbConnError => e
raise Server::UnableToStart.new("Couldn't start up the queue server", e) if tries > 4
sleep 0.1 * (2 ** tries)
tries += 1
retry
end
end
end
|
#enqueue(worker, *args) ⇒ Object
22
23
24
25
26
27
28
|
# File 'lib/drb_queue.rb', line 22
def enqueue(worker, *args)
raise Server::NotStarted, "You must start the server first" unless started?
raise ArgumentError, "#{worker} is not a module" unless worker.is_a?(Module)
raise ArgumentError, "#{worker} does not respond to perform" unless worker.respond_to?(:perform)
server.enqueue(worker, *args)
end
|
#shutdown!(immediately = false) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/drb_queue.rb', line 55
def shutdown!(immediately = false)
return unless started?
synchronize do
return unless started?
Process.kill(immediately ? 'KILL' : 'TERM', pid)
begin
::Timeout.timeout(20) { Process.wait }
rescue Timeout::Error
Process.kill('KILL', pid)
Process.wait
logger.error("#{self}: forced shutdown")
ensure
cleanup_socket
@started = false
@pid = nil
end
end
end
|
#start! ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/drb_queue.rb', line 30
def start!
raise Server::AlreadyStarted, "The server is already started" if started?
synchronize do
return if started?
@pid = fork_server
at_exit { shutdown! }
@started = true
begin
connect_client!
rescue => e
shutdown!
raise
end
end
end
|