Class: Oni::Daemon
Overview
The Daemon class takes care of retrieving work to be processed, scheduling it and dispatching it to a mapper and worker. In essence a Daemon instance can be seen as a controller when compared with typical MVC frameworks.
This daemon starts a number of threads (5 by default) that will each perform work on their own using the corresponding mapper and worker class.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_THREAD_AMOUNT =
The default amount of threads to start.
5
Instance Attribute Summary collapse
- #workers ⇒ Array<Thread> readonly
Instance Method Summary collapse
-
#complete(message, output, timings) ⇒ Object
Called when a job has been completed, by default this method is a noop.
-
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
-
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker.
-
#initialize ⇒ Daemon
constructor
Creates a new instance of the class and calls
#after_initialize
if it is defined. -
#process(message) ⇒ Object
Processes the given message.
-
#receive ⇒ Object
Receives a message, by default this method raises an error.
-
#run_thread ⇒ Object
The main code to execute in individual threads.
-
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
-
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
-
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution.
-
#stop ⇒ Object
Terminates all the threads and clears up the list.
-
#threads ⇒ Fixnum
Returns the amount of threads to use.
Methods included from Configurable
included, #option, #require_option!
Constructor Details
#initialize ⇒ Daemon
Creates a new instance of the class and calls #after_initialize
if it
is defined.
29 30 31 32 33 |
# File 'lib/oni/daemon.rb', line 29 def initialize @workers = [] after_initialize if respond_to?(:after_initialize) end |
Instance Attribute Details
#workers ⇒ Array<Thread> (readonly)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/oni/daemon.rb', line 13 class Daemon include Configurable attr_reader :workers ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_THREAD_AMOUNT = 5 ## # Creates a new instance of the class and calls `#after_initialize` if it # is defined. # def initialize @workers = [] after_initialize if respond_to?(:after_initialize) end ## # Starts the daemon and waits for all threads to finish execution. This # method is blocking since it will wait for all threads to finish. # # If the current class has a `before_start` method defined it's called # before starting the daemon. # def start before_start if respond_to?(:before_start) # If we don't have any threads run in non threaded mode. if threads > 0 threads.times do workers << spawn_thread end workers.each(&:join) else run_thread end rescue => error error(error) end ## # Terminates all the threads and clears up the list. Note that calling this # method acts much like sending a SIGKILL signal to a process: threads will # be shut down *immediately*. # def stop workers.each(&:kill) workers.clear end ## # Returns the amount of threads to use. # # @return [Fixnum] # def threads return option(:threads, DEFAULT_THREAD_AMOUNT) end ## # Processes the given message. Upon completion the `#complete` method is # called and passed the resulting output. # # @param [Mixed] message # def process() output = nil timings = Benchmark.measure do output = run_worker() end complete(, output, timings) end ## # Maps the input, runs the worker and then maps the output into something # that the daemon can understand. # # @param [Mixed] message # @return [Mixed] # def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = worker.process return mapper.map_output(output) end ## # Receives a message, by default this method raises an error. # # @raise [NotImplementedError] # def receive raise NotImplementedError, 'You must manually implement #receive' end ## # Called when a job has been completed, by default this method is a noop. # This method is passed 3 arguments: # # 1. The raw input message. # 2. The output of the worker (remapped by the mapper). # 3. A Benchmark::Tms instance that contains the timings for processing the # message. # # @param [Mixed] message The raw input message (e.g. an AWS SQS message) # @param [Mixed] output The output of the worker. # @param [Benchmark::Tms] timings # def complete(, output, timings) end ## # Called whenever an error is raised in the daemon, mapper or worker. By # default this method just re-raises the error. # # Note that this callback method is called from a thread in which the # exception occured, not from the main thread. # # @param [StandardError] error # def error(error) raise error end ## # Creates a new mapper and passes it a set of arguments as defined in # {Oni::Daemon#mapper_arguments}. # # @return [Oni::Mapper] # def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_thread thread = Thread.new { run_thread } thread.abort_on_exception = true return thread end ## # The main code to execute in individual threads. # # If an error occurs in the receive method or processing a job the error # handler is executed and the process is retried. It's the responsibility # of the `error` method to determine if the process should fail only once # (and fail hard) or if it should continue running. # def run_thread receive { || process() } rescue => error error(error) retry end end |
Instance Method Details
#complete(message, output, timings) ⇒ Object
Called when a job has been completed, by default this method is a noop. This method is passed 3 arguments:
- The raw input message.
- The output of the worker (remapped by the mapper).
- A Benchmark::Tms instance that contains the timings for processing the message.
131 132 |
# File 'lib/oni/daemon.rb', line 131 def complete(, output, timings) end |
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
153 154 155 156 157 158 159 |
# File 'lib/oni/daemon.rb', line 153 def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end |
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker. By default this method just re-raises the error.
Note that this callback method is called from a thread in which the exception occured, not from the main thread.
143 144 145 |
# File 'lib/oni/daemon.rb', line 143 def error(error) raise error end |
#process(message) ⇒ Object
Processes the given message. Upon completion the #complete
method is
called and passed the resulting output.
84 85 86 87 88 89 90 91 |
# File 'lib/oni/daemon.rb', line 84 def process() output = nil timings = Benchmark.measure do output = run_worker() end complete(, output, timings) end |
#receive ⇒ Object
Receives a message, by default this method raises an error.
114 115 116 |
# File 'lib/oni/daemon.rb', line 114 def receive raise NotImplementedError, 'You must manually implement #receive' end |
#run_thread ⇒ Object
The main code to execute in individual threads.
If an error occurs in the receive method or processing a job the error
handler is executed and the process is retried. It's the responsibility
of the error
method to determine if the process should fail only once
(and fail hard) or if it should continue running.
182 183 184 185 186 187 188 |
# File 'lib/oni/daemon.rb', line 182 def run_thread receive { || process() } rescue => error error(error) retry end |
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
100 101 102 103 104 105 106 107 |
# File 'lib/oni/daemon.rb', line 100 def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = worker.process return mapper.map_output(output) end |
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
166 167 168 169 170 171 172 |
# File 'lib/oni/daemon.rb', line 166 def spawn_thread thread = Thread.new { run_thread } thread.abort_on_exception = true return thread end |
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution. This method is blocking since it will wait for all threads to finish.
If the current class has a before_start
method defined it's called
before starting the daemon.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/oni/daemon.rb', line 42 def start before_start if respond_to?(:before_start) # If we don't have any threads run in non threaded mode. if threads > 0 threads.times do workers << spawn_thread end workers.each(&:join) else run_thread end rescue => error error(error) end |
#stop ⇒ Object
Terminates all the threads and clears up the list. Note that calling this method acts much like sending a SIGKILL signal to a process: threads will be shut down immediately.
64 65 66 67 |
# File 'lib/oni/daemon.rb', line 64 def stop workers.each(&:kill) workers.clear end |
#threads ⇒ Fixnum
Returns the amount of threads to use.
74 75 76 |
# File 'lib/oni/daemon.rb', line 74 def threads return option(:threads, DEFAULT_THREAD_AMOUNT) end |