Class: Beehive::Worker
- Inherits:
-
Object
- Object
- Beehive::Worker
- Defined in:
- lib/beehive/worker.rb
Overview
The Worker class is used to retrieve and process jobs (in the background). Whenever a job is received the worker will fork itself and execute the job in that process. This is useful because any errors won’t crash the worker itself plus it will reduce the memory usage as once Ruby allocates memory to a process it’s never released unless that process exits.
Constant Summary collapse
- Options =
Hash containing the default worker options.
{ :logger => ::Logger.new(STDOUT), :daemonize => false, :jobs => [], :wait => 5, :log_level => Logger::WARN, :pid => File.join(Dir.pwd, 'worker.pid') }
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Instance of Beehive::Client, used for communicating with the Redis server.
-
#options ⇒ Object
Hash containing all the custom configuration options.
Instance Method Summary collapse
-
#initialize(redis_options = {}, worker_options = {}) ⇒ Worker
constructor
Creates a new instance of the class, sets all the options and connects to the Redis database.
-
#work ⇒ Object
Waits for available jobs and execute a job whenever one is available.
Constructor Details
#initialize(redis_options = {}, worker_options = {}) ⇒ Worker
Creates a new instance of the class, sets all the options and connects to the Redis database.
Redis#new for more information. with Logger of the standard library. reduces CPU and network usage. Logger::WARN by default.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/beehive/worker.rb', line 56 def initialize( = {}, = {}) @connection = ::Beehive::Client.new() @options = Options.merge() @options[:logger].level = @options[:log_level] @shutdown = false # Check if the given jobs are valid @options[:jobs].each do |job| if !::Beehive::Jobs.key?(job) raise("The job \"#{job}\" is invalid as it could not be found in Beehive::Jobs") end end trap_signals end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Instance of Beehive::Client, used for communicating with the Redis server
29 30 31 |
# File 'lib/beehive/worker.rb', line 29 def connection @connection end |
#options ⇒ Object
Hash containing all the custom configuration options
32 33 34 |
# File 'lib/beehive/worker.rb', line 32 def @options end |
Instance Method Details
#work ⇒ Object
Waits for available jobs and execute a job whenever one is available.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/beehive/worker.rb', line 82 def work # Daemonize the process? if @options[:daemonize] === true Process.daemon(true) end @worker_pid = Process.pid @options[:logger].info("Starting main worker, PID: #{@worker_pid}") write_pid loop do if @shutdown === true @options[:logger].info('The party has ended, time to shut down') @connection.disconnect File.unlink(@options[:pid]) exit end # Reset the child PID @child_pid = nil # Check if there are any jobs available @options[:jobs].each do |job| params = @connection.get(job) if params @options[:logger].info( "Received the job \"#{job}\" with the following data: #{params.inspect}" ) # Fork the process and run the job @child_pid = Process.fork do @options[:logger].info('Process forked, executing job') begin ::Beehive::Jobs[job].call(params, @options[:logger]) @options[:logger].info('Job successfully processed') exit rescue => e @options[:logger].error("Failed to execute the job: #{e.inspect}") end end # Wait for the job to finish. This prevents this worker from spawning a worker # for each job it has to process (which could result in hundreds of processes # being spawned. Process.waitpid(@child_pid) end end # Did the PID change for some reason? if Process.pid != @worker_pid @worker_pid = Process.pid write_pid end # Reduces CPU load and network traffic sleep(@options[:wait]) end end |