Module: DRbQueue

Extended by:
DRbQueue, Forwardable
Included in:
DRbQueue
Defined in:
lib/drb_queue.rb,
lib/drb_queue/store.rb,
lib/drb_queue/server.rb,
lib/drb_queue/version.rb,
lib/drb_queue/store/redis.rb,
lib/drb_queue/configuration.rb

Defined Under Namespace

Classes: Configuration, Server, Store

Constant Summary collapse

ConfiguredAfterStarted =
Class.new(StandardError)
VERSION =
"0.0.2"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#startedObject (readonly) Also known as: started?

Returns the value of attribute started.



19
20
21
# File 'lib/drb_queue.rb', line 19

def started
  @started
end

Instance Method Details

#configureObject



49
50
51
52
53
# File 'lib/drb_queue.rb', line 49

def configure
  raise ConfiguredAfterStarted, "You must configure #{self.name} BEFORE starting the server" if started?

  synchronize { yield configuration }
end

#connect_client!Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/drb_queue.rb', line 77

def connect_client!
  synchronize do
    tries = 0

    begin
      @server = DRbObject.new_with_uri(server_uri)
      @server.ping
    rescue DRb::DRbConnError => e
      raise Server::UnableToStart.new("Couldn't start up the queue server", e) if tries > 4
      sleep 0.1 * (2 ** tries)
      tries += 1
      retry
    end
  end
end

#enqueue(worker, *args) ⇒ Object

Raises:



22
23
24
25
26
27
28
# File 'lib/drb_queue.rb', line 22

def enqueue(worker, *args)
  raise Server::NotStarted, "You must start the server first" unless started?
  raise ArgumentError, "#{worker} is not a module" unless worker.is_a?(Module)
  raise ArgumentError, "#{worker} does not respond to perform" unless worker.respond_to?(:perform)

  server.enqueue(worker, *args)
end

#shutdown!(immediately = false) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/drb_queue.rb', line 55

def shutdown!(immediately = false)
  return unless started?

  synchronize do
    return unless started?

    Process.kill(immediately ? 'KILL' : 'TERM', pid)

    begin
      ::Timeout.timeout(20) { Process.wait }
    rescue Timeout::Error
      Process.kill('KILL', pid)
      Process.wait
      logger.error("#{self}: forced shutdown")
    ensure
      cleanup_socket
      @started = false
      @pid = nil
    end
  end
end

#start!Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/drb_queue.rb', line 30

def start!
  raise Server::AlreadyStarted, "The server is already started" if started?

  synchronize do
    return if started?

    @pid = fork_server
    at_exit { shutdown! }
    @started = true

    begin
      connect_client!
    rescue => e
      shutdown!
      raise
    end
  end
end