Class: Resque::Pool::Dynamic

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/pool/dynamic/shell.rb,
lib/resque/pool/dynamic.rb,
lib/resque/pool/dynamic/logfile.rb,
lib/resque/pool/dynamic/version.rb

Overview

Add-on to resque-pool to allow manually controlled, dynamic sessions

Defined Under Namespace

Classes: Logfile, Shell

Constant Summary collapse

VERSION =
"0.1.3"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeDynamic

Returns a new instance of Dynamic.



15
16
17
18
# File 'lib/resque/pool/dynamic.rb', line 15

def initialize
  # Make myself a project group leader
  Process.setpgid(0, 0)
end

Instance Attribute Details

#pidInteger, NilClass (readonly)

PID of the resque-pool process, or nil if it’s not running

Returns:

  • (Integer, NilClass)


13
14
15
# File 'lib/resque/pool/dynamic.rb', line 13

def pid
  @pid
end

Class Method Details

.shell(options = {}) ⇒ Object

Run a dynamic resque pool session.

Parameters:

  • options (Hash) (defaults to: {})

    Options passed to the Shell instance

  • opts (Hash)

    a customizable set of options



15
16
17
18
19
20
21
22
23
# File 'lib/resque/pool/dynamic/shell.rb', line 15

def self.shell(options={})
  myself = self.new
  myself.start unless options[:no_start]
  myself.status
  Shell.run(myself, options)
  puts "\nBye!"
ensure
  myself.stop if myself
end

Instance Method Details

#configHash #config(opts) ⇒ Hash

Note:

Default configuration is taken from environment variable “WORKERS”, as interpreted by the #parse_config_string method.

Examples:

config

#=> {"foo"=>1, "bar"=>2}
config :foo => 2, :baz => 7
#=> {"foo"=>2, "baz"=>7, "bar"=>2}
config :bar => 0
#=> {"foo"=>2, "baz"=>7}

Overloads:

  • #configHash

    Show current workers configuration

    Returns:

    • (Hash)
  • #config(opts) ⇒ Hash

    Update workers configuration. If configuration has change and resque-pool is running, it is reloaded.

    Parameters:

    • opts (Hash)

      Dictionary of worker process counts to update (pass 0 or nil as value to delete all workers for a queue from pool)

    Returns:

    • (Hash)

      Updated config



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/resque/pool/dynamic.rb', line 66

def config(args=nil)
  @config ||= parse_config_string(ENV['WORKERS'])

  if args
    oldconfig = config.clone

    args.each do |w, n|
      w = w.to_s
      if n.nil? || n==0
        @config.delete w
      else
        @config[w] = n
      end
    end

    if pid && @config != oldconfig
      write_config
      reload
    end
  end

  @config
end

#config_pathString

Path to the temporary config file

Returns:

  • (String)


29
30
31
# File 'lib/resque/pool/dynamic.rb', line 29

def config_path
  config_tempfile.path
end

#config_tempfileTempfile

Tempfile instance for configuration file

Returns:

  • (Tempfile)


22
23
24
# File 'lib/resque/pool/dynamic.rb', line 22

def config_tempfile
  @config_tempfile ||= Tempfile.new(%w(resque-pool-dynamic .yaml))
end

#has_log?Boolean

True if we have an open log file

Returns:

  • (Boolean)


24
25
26
# File 'lib/resque/pool/dynamic/logfile.rb', line 24

def has_log?
  !!@log
end

#kill!(sig) ⇒ Object

Send signal to a running resque-pool

Parameters:

  • sig (String, Integer)

    Signal name or number



154
155
156
# File 'lib/resque/pool/dynamic.rb', line 154

def kill!(sig)
  Process.kill(sig, pid!)
end

#logLogfile

Open log of resque-pool process

Returns:



17
18
19
20
# File 'lib/resque/pool/dynamic/logfile.rb', line 17

def log
  # pid! call will raise an exception if process is not running yet
  @log ||= Logfile.open(log_path, :pid => pid!)
end

#log_pathString

Logfile path

Returns:

  • (String)

    Path to the log file



9
10
11
12
# File 'lib/resque/pool/dynamic/logfile.rb', line 9

def log_path
  # pid! call will raise an exception if process is not running yet
  @log_path ||= ENV['RESQUE_POOL_LOG'] || "resque-pool.#{pid!}.log"
end

#loop_with_index {|i| ... } ⇒ Object

Loop, providing index to the block

Yields:

  • (i)

    Ever increasing index



97
98
99
100
101
102
103
# File 'lib/resque/pool/dynamic/shell.rb', line 97

def loop_with_index
  index = 0
  loop do
    yield(index)
    index += 1
  end
end

#parse_config_string(spec) ⇒ Hash

Parse workers configuration

Examples:

parse_config_string('foo=1:bar=2:baz,quux=4')
#=> {"baz,quux"=>4, "foo"=>1, "bar"=>2}

Parameters:

  • spec (String)

    Workers specification string Format: queue2=number:queue2=number2:queue3,queue5=number4:*=number5

Returns:

  • (Hash)

    Configuration dictionary



40
41
42
43
44
45
46
# File 'lib/resque/pool/dynamic.rb', line 40

def parse_config_string(spec)
  return {} unless spec
  Hash[ 
    spec.split(':').map { |w|
      k, v = w.split('=') ; [ k, v.to_i ] } 
  ]
end

#pid!Integer

Return pid of running resque-pool or raise an exception

Returns:

  • (Integer)

Raises:

  • (RuntimeError)


147
148
149
# File 'lib/resque/pool/dynamic.rb', line 147

def pid!
  pid or raise "Not started!"
end

#pstreeObject

Show child process tree by calling ‘pstree` system command



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/resque/pool/dynamic.rb', line 189

def pstree
  system case RUBY_PLATFORM
         when /darwin/
           "pstree -w #{pid!}"
         when /linux/
           "pstree -l -a -p #{pid!}"
         else
           "pstree #{pid!}"
         end
  nil
end

#reloadObject

Reload resque-pool configuration



181
182
183
184
185
# File 'lib/resque/pool/dynamic.rb', line 181

def reload
  puts "Reloading resque-pool-master #{pid!} configuration"
  write_config
  kill!('HUP')
end

#run_poolObject

Start resque-pool master This is executed in a forked child process.



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/resque/pool/dynamic.rb', line 100

def run_pool
  @pid = $$
  ENV["RESQUE_POOL_CONFIG"] = config_path

  $stdin.reopen '/dev/null'
  log = File.new(log_path, "a")
  $stdout.reopen log
  $stderr.reopen log
  $stdout.sync = $stderr.sync = true

  require 'rake'
  Rake::Task['resque:pool'].invoke 
end

#startObject

Start resque-pool, showing startup logs



134
135
136
137
138
139
140
141
142
# File 'lib/resque/pool/dynamic.rb', line 134

def start
  unless pid
    start!
    log.rewind
    log.tail_until /started manager/
  else
    warn "Already started as #{pid}"
  end
end

#start!Integer

Fork a child process for resque-pool master

Returns:

  • (Integer)

    PID of the child process



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/resque/pool/dynamic.rb', line 117

def start!
  raise "Already running: #{pid}" if pid
  write_config
  @pid = fork { self.run_pool }
  Process.setpgid(pid, 0) # don't broadcast ^C to child

  if 30.times { |i| sleep 1 ; break if File.exist? log_path }
    # Loop will return nil if broken. If value is returned, file does not exist.
    raise "Log file #{log_path} still not present after #{i} seconds, giving up"
    stop!
  end

  pid
end

#statusObject

Show current status



203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/resque/pool/dynamic.rb', line 203

def status
  puts( '',
    "Status: " << ( pid ? "running, pid: #{pid}" : "not running" ),
    "Configuration:",
    YAML::dump(config).lines.grep(/^(?!---\s*$)/).map { |v| "  " << v }
    )
  if pid
    puts "Process tree:"
    pstree
  end
  puts
end

#stopObject

Stop running resque-pool, show shutdown logs



171
172
173
174
175
176
177
# File 'lib/resque/pool/dynamic.rb', line 171

def stop
  puts "Shutting down resque-pool-master #{pid}"
  log.ff if has_log?
  status = stop!
  log.tail if has_log?
  return status
end

#stop!Object

Stop running resque-pool



160
161
162
163
164
165
166
167
# File 'lib/resque/pool/dynamic.rb', line 160

def stop!
  if pid
    kill! "INT"
    wpid, status = Process.wait2(pid)
    @pid = nil
    status
  end
end

#write_configObject

Write temporary configuration file



92
93
94
95
96
# File 'lib/resque/pool/dynamic.rb', line 92

def write_config
  File.open(config_path, 'w') do |cf|
    cf.write(YAML::dump(config))
  end
end