Class: Oni::Daemon
Overview
The Daemon class takes care of retrieving work to be processed, scheduling it and dispatching it to a mapper and worker. In essence a Daemon instance can be seen as a controller when compared with typical MVC frameworks.
This daemon starts a number of threads (5 by default) that will each perform work on their own using the corresponding mapper and worker class.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_WORKER_AMOUNT =
The default amount of worker to start.
1
- DEFAULT_THREAD_AMOUNT =
The default amount of threads to start.
5
- DEFAULT_WORKER_TIMEOUT =
The default amount of threads to start.
nil
Instance Attribute Summary collapse
-
#daemon_workers ⇒ Object
readonly
Returns the value of attribute daemon_workers.
- #workers ⇒ Array<Thread> readonly
Instance Method Summary collapse
-
#complete(message, output) ⇒ Object
Called when a job has been completed, by default this method is a noop.
-
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
-
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker.
-
#initialize ⇒ Daemon
constructor
Creates a new instance of the class and calls
#after_initialize
if it is defined. -
#process(message) ⇒ Object
Processes the given message.
-
#receive ⇒ Object
Receives a message, by default this method raises an error.
-
#run_thread ⇒ Object
The main code to execute in individual threads.
-
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
-
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
-
#spawn_worker(name = nil, &block) ⇒ Thread
Spawns a new thread that waits for daemon input.
- #standard_worker ⇒ Object
-
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution.
-
#stop ⇒ Object
Terminates all the threads and clears up the list.
-
#threads ⇒ Fixnum
Returns the amount of threads to use.
-
#worker_timeout ⇒ Fixnum
Returns the amount of threads to use.
Methods included from Configurable
included, #option, #require_option!
Constructor Details
#initialize ⇒ Daemon
Creates a new instance of the class and calls #after_initialize
if it
is defined.
43 44 45 46 47 |
# File 'lib/oni/daemon.rb', line 43 def initialize @daemon_workers = Hash.new{ |h, k| h[k] = [] } after_initialize if respond_to?(:after_initialize) end |
Instance Attribute Details
#daemon_workers ⇒ Object (readonly)
Returns the value of attribute daemon_workers.
16 17 18 |
# File 'lib/oni/daemon.rb', line 16 def daemon_workers @daemon_workers end |
#workers ⇒ Array<Thread> (readonly)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/oni/daemon.rb', line 13 class Daemon include Configurable attr_reader :daemon_workers ## # The default amount of worker to start. # # @return [Fixnum] # DEFAULT_WORKER_AMOUNT = 1 ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_THREAD_AMOUNT = 5 ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_WORKER_TIMEOUT = nil ## # Creates a new instance of the class and calls `#after_initialize` if it # is defined. # def initialize @daemon_workers = Hash.new{ |h, k| h[k] = [] } after_initialize if respond_to?(:after_initialize) end ## # Starts the daemon and waits for all threads to finish execution. This # method is blocking since it will wait for all threads to finish. # # If the current class has a `before_start` method defined it's called # before starting the daemon. # def start before_start if respond_to? :before_start wthreads = if threads <= 1 then [run_thread] elsif workers <= 1 then standard_worker else wthreads = Array.new(workers){ |i| spawn_worker i } end after_start if respond_to? :after_start %i[INT TERM].each{ |sig| trap(sig){ stop } } wthreads.each(&:join) if workers > 1 rescue => error error(error) end ## # Terminates all the threads and clears up the list. Note that calling this # method acts much like sending a SIGKILL signal to a process: threads will # be shut down *immediately*. # def stop daemon_workers.each do |pid, worker_threads| worker_threads.each(&:kill) worker_threads.clear end exit end def workers option :workers, DEFAULT_WORKER_AMOUNT end ## # Returns the amount of threads to use. # # @return [Fixnum] # def threads option :threads, DEFAULT_THREAD_AMOUNT end ## # Returns the amount of threads to use. # # @return [Fixnum] # def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end ## # Processes the given message. Upon completion the `#complete` method is # called and passed the resulting output. # # @param [Mixed] message # def process() output = run_worker() complete(, output) end ## # Maps the input, runs the worker and then maps the output into something # that the daemon can understand. # # @param [Mixed] message # @return [Mixed] # def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end ## # Receives a message, by default this method raises an error. # # @raise [NotImplementedError] # def receive raise NotImplementedError, 'You must manually implement #receive' end ## # Called when a job has been completed, by default this method is a noop. # This method is passed 2 arguments: # # 1. The raw input message. # 2. The output of the worker (remapped by the mapper). # # @param [Mixed] message The raw input message (e.g. an AWS SQS message) # @param [Mixed] output The output of the worker. # def complete(, output) end ## # Called whenever an error is raised in the daemon, mapper or worker. By # default this method just re-raises the error. # # @param [StandardError] error # def error(error) raise error end ## # Creates a new mapper and passes it a set of arguments as defined in # {Oni::Daemon#mapper_arguments}. # # @return [Oni::Mapper] # def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_worker name = nil, &block Thread.new do pid = nil loop do # keep restarting for OOM and other cases pid = fork do Process.setproctitle "#{$0}: worker #{name}" if name if block then yield else standard_worker end end Process.wait pid end ensure Process.kill :KILL, pid end end def standard_worker Array.new(threads) do spawn_thread.tap{ |t| daemon_workers[Process.pid] << t } end.each(&:join) end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_thread Thread.new{ run_thread }.tap do |t| t.abort_on_exception = true end end ## # The main code to execute in individual threads. # # If an error occurs in the receive method or processing a job the error # handler is executed and the process is retried. It's the responsibility # of the `error` method to determine if the process should fail only once # (and fail hard) or if it should continue running. # def run_thread receive do || process end rescue => error error(error) retry end end |
Instance Method Details
#complete(message, output) ⇒ Object
Called when a job has been completed, by default this method is a noop. This method is passed 2 arguments:
- The raw input message.
- The output of the worker (remapped by the mapper).
156 157 |
# File 'lib/oni/daemon.rb', line 156 def complete(, output) end |
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
175 176 177 178 179 180 181 |
# File 'lib/oni/daemon.rb', line 175 def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end |
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker. By default this method just re-raises the error.
165 166 167 |
# File 'lib/oni/daemon.rb', line 165 def error(error) raise error end |
#process(message) ⇒ Object
Processes the given message. Upon completion the #complete
method is
called and passed the resulting output.
113 114 115 116 117 |
# File 'lib/oni/daemon.rb', line 113 def process() output = run_worker() complete(, output) end |
#receive ⇒ Object
Receives a message, by default this method raises an error.
142 143 144 |
# File 'lib/oni/daemon.rb', line 142 def receive raise NotImplementedError, 'You must manually implement #receive' end |
#run_thread ⇒ Object
The main code to execute in individual threads.
If an error occurs in the receive method or processing a job the error
handler is executed and the process is retried. It's the responsibility
of the error
method to determine if the process should fail only once
(and fail hard) or if it should continue running.
229 230 231 232 233 234 235 236 237 |
# File 'lib/oni/daemon.rb', line 229 def run_thread receive do || process end rescue => error error(error) retry end |
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/oni/daemon.rb', line 126 def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end |
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
215 216 217 218 219 |
# File 'lib/oni/daemon.rb', line 215 def spawn_thread Thread.new{ run_thread }.tap do |t| t.abort_on_exception = true end end |
#spawn_worker(name = nil, &block) ⇒ Thread
Spawns a new thread that waits for daemon input.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/oni/daemon.rb', line 188 def spawn_worker name = nil, &block Thread.new do pid = nil loop do # keep restarting for OOM and other cases pid = fork do Process.setproctitle "#{$0}: worker #{name}" if name if block then yield else standard_worker end end Process.wait pid end ensure Process.kill :KILL, pid end end |
#standard_worker ⇒ Object
204 205 206 207 208 |
# File 'lib/oni/daemon.rb', line 204 def standard_worker Array.new(threads) do spawn_thread.tap{ |t| daemon_workers[Process.pid] << t } end.each(&:join) end |
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution. This method is blocking since it will wait for all threads to finish.
If the current class has a before_start
method defined it's called
before starting the daemon.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/oni/daemon.rb', line 56 def start before_start if respond_to? :before_start wthreads = if threads <= 1 then [run_thread] elsif workers <= 1 then standard_worker else wthreads = Array.new(workers){ |i| spawn_worker i } end after_start if respond_to? :after_start %i[INT TERM].each{ |sig| trap(sig){ stop } } wthreads.each(&:join) if workers > 1 rescue => error error(error) end |
#stop ⇒ Object
Terminates all the threads and clears up the list. Note that calling this method acts much like sending a SIGKILL signal to a process: threads will be shut down immediately.
77 78 79 80 81 82 83 |
# File 'lib/oni/daemon.rb', line 77 def stop daemon_workers.each do |pid, worker_threads| worker_threads.each(&:kill) worker_threads.clear end exit end |
#threads ⇒ Fixnum
Returns the amount of threads to use.
94 95 96 |
# File 'lib/oni/daemon.rb', line 94 def threads option :threads, DEFAULT_THREAD_AMOUNT end |
#worker_timeout ⇒ Fixnum
Returns the amount of threads to use.
103 104 105 |
# File 'lib/oni/daemon.rb', line 103 def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end |