Class: ServiceWave
- Inherits:
-
Object
- Object
- ServiceWave
- Defined in:
- app/models/service_wave.rb
Overview
ServiceWave is basically responsible for multi-threaded execution of a list of services, all in the same priority. Generally it’s only called by Collection, nobody else needs it directly.
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#priority_level ⇒ Object
Returns the value of attribute priority_level.
-
#services ⇒ Object
Returns the value of attribute services.
Instance Method Summary collapse
- #forward_exceptions=(f) ⇒ Object
- #forward_exceptions? ⇒ Boolean
- #handle(request, session_id) ⇒ Object
-
#initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config) ⇒ ServiceWave
constructor
Priority level is purely information, used for debug output.
-
#prepare_dispatch!(request, service) ⇒ Object
Safe to call in a thread.
Constructor Details
#initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config) ⇒ ServiceWave
Priority level is purely information, used for debug output.
13 14 15 16 17 18 19 20 21 22 23 |
# File 'app/models/service_wave.rb', line 13 def initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config) @services = service_objects @priority_level = priority_level @config = config @log_timing = config.lookup!("log_service_timing", true) # Don't forward exceptions, that'll interrupt other service processing. # Catch the exception, record it in the dispatch table, done. May want # to set this to true for debugging/development, but NOT for production. @forward_exceptions = false end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'app/models/service_wave.rb', line 10 def config @config end |
#priority_level ⇒ Object
Returns the value of attribute priority_level.
9 10 11 |
# File 'app/models/service_wave.rb', line 9 def priority_level @priority_level end |
#services ⇒ Object
Returns the value of attribute services.
8 9 10 |
# File 'app/models/service_wave.rb', line 8 def services @services end |
Instance Method Details
#forward_exceptions=(f) ⇒ Object
160 161 162 |
# File 'app/models/service_wave.rb', line 160 def forward_exceptions=(f) @foward_excpetions = f end |
#forward_exceptions? ⇒ Boolean
157 158 159 |
# File 'app/models/service_wave.rb', line 157 def forward_exceptions? return @forward_exceptions end |
#handle(request, session_id) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'app/models/service_wave.rb', line 31 def handle(request, session_id) return if (@services.nil? || @services.empty?) bundle_start = Time.now Rails.logger.info(TermColor.color("Umlaut: Launching service wave #{@priority_level} #{'(non-threaded)' unless config.lookup!("threaded_service_wave", true) }", :yellow) + ", request #{request.id}") if @log_timing threads = [] some_service_executed = false @services.each do | service | some_service_executed = true local_request = nil service_start = Time.now if config.lookup!("threaded_service_wave", true) threads << Thread.new(request.id, service.clone) do | request_id, local_service | # Deal with ruby's brain dead thread scheduling by setting # bg threads to a lower priority so they don't interfere with fg # threads. Thread.current.priority = -1 # Save some things in thread local hash useful for debugging Thread.current[:debug_name] = local_service.class.name Thread.current[:service] = service # Tell our AR extension not to allow implicit checkouts ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!") begin local_request = Request.connection_pool.with_connection do # We are attempting to pre-load all relationships, both for efficiency, # and so our thread can use them all without needing to checkout # an ActiveRecord connection. req = Request.includes({:referent => :referent_values}, :service_responses, :dispatched_services).find(request_id) # It turns out even though referent.referent_values is loaded from the db, on first # access Rails will still access #connection, triggering a checkout. We force # that to happen here, in our with_connection, so it won't happen later. # # Yeah, this is a hacky mess, ActiveRecord isn't happy using it the way we are. # Should we just surround all of handle_wrapper in a with_connection checkout? # Maybe. But it would require more connections in the pool to have those # longer checkouts. req.referent.referent_values req.service_responses req.dispatched_services req end if prepare_dispatch!(local_request, local_service) local_service.handle_wrapper(local_request) else Rails.logger.info("NOT launching service #{local_service.service_id}, level #{@priority_level}, request #{local_request.id}: not in runnable state") if @log_timing end rescue StandardError => e # We may not be able to access ActiveRecord because it may # have been an AR connection error, perhaps out of connections # in the pool. So log and record in non-AR ways. # the code waiting on our thread will see exception # reported in Thread local var, and log it AR if possible. # Log it too, although experience shows it may never make it to the # log for mysterious reasons. log_msg = TermColor.color("Umlaut: Threaded service raised exception.", :red, true) + " Service: #{service.service_id}, #{e.class} #{e.}. Backtrace:\n #{clean_backtrace(e).join("\n ")}" Rails.logger.error(log_msg) # And stick it in a thread variable too Thread.current[:exception] = e # And try to re-raise if it's one we really don't want to swallow. # Sorry, a mess. raise e if defined?(VCR::Errors::UnhandledHTTPRequestError) && e.kind_of?(VCR::Errors::UnhandledHTTPRequestError) ensure Rails.logger.info(TermColor.color("Umlaut: Completed service #{local_service.service_id}", :yellow)+ ", level #{@priority_level}, request #{local_request && local_request.id}: in #{Time.now - service_start}.") if @log_timing end end else # not threaded begin if prepare_dispatch!(request, service) service.handle_wrapper(request) else Rails.logger.info("NOT launching service #{service.service_id}, level #{@priority_level}, request #{request.id}: not in runnable state") if @log_timing end ensure Rails.logger.info(TermColor.color("Umlaut: Completed service #{service.service_id}", :yellow)+ ", level #{@priority_level}, request #{request && request.id}: in #{Time.now - service_start}.") if @log_timing end end end # Wait for all the threads to complete, if any. threads.each do |aThread| aThread.join if aThread[:exception] debugger if ENV["UMLAUT_AUTO_DEBUGGER"] begin request.dispatched(aThread[:service], DispatchedService::FailedFatal, aThread[:exception]) rescue Exception => e debugger if ENV["UMLAUT_AUTO_DEBUGGER"] raise e end end # Okay, raise if exception, if desired. if ( aThread[:exception] && self.forward_exceptions? ) raise aThread[:exception] end end threads.clear # more paranoia Rails.logger.info(TermColor.color("Umlaut: Completed service wave #{@priority_level}", :yellow) + ", request #{request.id}: in #{Time.now - bundle_start}") if some_service_executed && @log_timing end |
#prepare_dispatch!(request, service) ⇒ Object
Safe to call in a thread. Returns true or false depending on whether dispatch should proceed.
27 28 29 |
# File 'app/models/service_wave.rb', line 27 def prepare_dispatch!(request, service) return request.register_in_progress(service) end |