Class: Sqew::Manager

Inherits:
Qu::Worker
  • Object
show all
Defined in:
lib/sqew/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_workers = 3) ⇒ Manager

Returns a new instance of Manager.



5
6
7
8
9
10
11
12
13
14
# File 'lib/sqew/manager.rb', line 5

def initialize(max_workers = 3)
  raise "Configure sqew before starting the manager" unless Sqew.server
  super([])
  @max_workers = max_workers
  @uri = URI.parse(Sqew.server)
  @poll = 1
  @thin_server = nil

  @group = ThreadGroup.new
end

Instance Attribute Details

#max_workersObject

Returns the value of attribute max_workers.



3
4
5
# File 'lib/sqew/manager.rb', line 3

def max_workers
  @max_workers
end

Instance Method Details

#handle_signalsObject



45
46
47
48
49
50
51
52
53
# File 'lib/sqew/manager.rb', line 45

def handle_signals
  logger.debug "Worker #{id} registering traps for INT and TERM signals"
  %W(INT TERM).each do |sig|
    trap(sig) do
      logger.info "Worker #{id} received #{sig}, will wait for workers to finish then quit"
      @exiting = true
    end
  end
end

#pause_workersObject



16
17
18
19
# File 'lib/sqew/manager.rb', line 16

def pause_workers
  @paused_workers ||= @max_workers
  @max_workers = 0
end

#resume_workersObject



21
22
23
24
25
# File 'lib/sqew/manager.rb', line 21

def resume_workers
  if @paused_workers
    @max_workers = @paused_workers
  end
end

#startObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sqew/manager.rb', line 60

def start
  logger.warn "Worker #{id} starting"
  start_slave
  handle_signals
  start_server
  Qu.backend.clear_running
  loop do
    work
    sleep @poll
    
    if @exiting
      stop_server
      @group.list.map {|t| t.join }
      Qu.backend.close
      stop_slave
      break
    end
  end
ensure
  logger.debug "Worker #{id} done"
end

#start_serverObject



32
33
34
35
36
37
38
39
# File 'lib/sqew/manager.rb', line 32

def start_server
  logger.info "Starting server on #{@uri}"
  Thread.new do
    Thin::Logging.silent = true
    @thin_server = Thin::Server.new(@uri.host, @uri.port, Server.new(self), {signals:false})
    @thin_server.start
  end
end

#stop_serverObject



41
42
43
# File 'lib/sqew/manager.rb', line 41

def stop_server
  @thin_server.stop
end

#work_offObject



55
56
57
58
# File 'lib/sqew/manager.rb', line 55

def work_off
  Qu.backend.clear_running
  super
end