Class: Collection
- Inherits:
-
Object
- Object
- Collection
- Defined in:
- app/models/collection.rb
Overview
A Collection object encapsulates a given UmlautRequest, and a given list of Umlaut services that should be run off that request.
That’s exactly what it’s initialized with: an umlaut request, and list of service definitions. Third parameter pass in an umlaut configuration object, to get various timeout values. If you don’t pass one in, defaults will be used.
The Collection holds and executes the logic for running those services, foreground and background, making sure no service is run twice if it’s already in progress, timing out expired services, etc.
This code is a mess, sorry.
Instance Attribute Summary collapse
-
#background_service_timeout ⇒ Object
configs.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#requeue_failedtemporary_services_in ⇒ Object
configs.
-
#response_expire_crontab_format ⇒ Object
configs.
-
#response_expire_interval ⇒ Object
configs.
-
#umlaut_request ⇒ Object
Returns the value of attribute umlaut_request.
Instance Method Summary collapse
- #completed_dispatch_expired?(ds) ⇒ Boolean
-
#dispatch_background! ⇒ Object
Will run such services in background priority waves.
-
#dispatch_foreground! ⇒ Object
Will run such services in foreground priority waves.
-
#dispatch_services! ⇒ Object
Starts running all services that are in this collection, for the given request set for this collection, if and only if they are not already in progress.
-
#freshen_dispatches! ⇒ Object
Goes through existing DispatchedService objects, and freshens them up: * If a service is marked in progress longer than timeout, mark it failed temporary.
-
#get_service_definitions(options = {}) ⇒ Object
Get service definition hashes for services in this institution.
-
#initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) ⇒ Collection
constructor
a_umlaut_request is an UmlautRequest, representing a request for services for a context object.
-
#instantiate_services!(options = {}) ⇒ Object
Instantiate new copies of services included in this collection, which services specified by options, can combine: :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all.
-
#link_out_service_level(level) ⇒ Object
Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.
-
#mark_queued_if_empty! ⇒ Object
For all configured services, if they have NO DispatchedService object, then create one with status Queued.
-
#runnable_services_for_priority(priority, options = {}) ⇒ Object
All services for priority that are marked Queued, so long as no previous waves are still marked running.
Constructor Details
#initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) ⇒ Collection
a_umlaut_request is an UmlautRequest, representing a request for services for a context object. service_hash is a hash of hashes with service definitions, as would be in services.yml config is a Confstruct::Configuration associated with the current controller, has a few config options in it relevant to collection service exec; but don’t pass in, we’ll use a blank one with default values, no prob.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'app/models/collection.rb', line 35 def initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) self.umlaut_request = a_umlaut_request self.logger = Rails.logger self.response_expire_interval = config.lookup!("response_expire_interval", 1.day) self.response_expire_crontab_format = config.lookup!("response_expire_crontab_format", nil) self.background_service_timeout = config.lookup!("background_service_timeout", 30.seconds) self.requeue_failedtemporary_services_in = config.lookup!("requeue_failedtemporary_services_in", 500.seconds) # @service_definitions will be a two-level hash, pointing to an array.. Task is Standard, LinkOut, etc. # { [task] => { [priority_level] => [config1, config2, config3], # [priority_level_2] => [configa], } # [...] # } @service_definitions_flat = service_hash @service_definitions = {} # Arrange services by type and priority in @service_definitions gather_services end |
Instance Attribute Details
#background_service_timeout ⇒ Object
configs
21 22 23 |
# File 'app/models/collection.rb', line 21 def background_service_timeout @background_service_timeout end |
#logger ⇒ Object
Returns the value of attribute logger.
19 20 21 |
# File 'app/models/collection.rb', line 19 def logger @logger end |
#requeue_failedtemporary_services_in ⇒ Object
configs
21 22 23 |
# File 'app/models/collection.rb', line 21 def requeue_failedtemporary_services_in @requeue_failedtemporary_services_in end |
#response_expire_crontab_format ⇒ Object
configs
21 22 23 |
# File 'app/models/collection.rb', line 21 def response_expire_crontab_format @response_expire_crontab_format end |
#response_expire_interval ⇒ Object
configs
21 22 23 |
# File 'app/models/collection.rb', line 21 def response_expire_interval @response_expire_interval end |
#umlaut_request ⇒ Object
Returns the value of attribute umlaut_request.
18 19 20 |
# File 'app/models/collection.rb', line 18 def umlaut_request @umlaut_request end |
Instance Method Details
#completed_dispatch_expired?(ds) ⇒ Boolean
274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'app/models/collection.rb', line 274 def completed_dispatch_expired?(ds) interval = self.response_expire_interval crontab = self.response_expire_crontab_format now = Time.now return nil unless interval || crontab expired_interval = interval && (now - ds.created_at > interval) expired_crontab = crontab && (now > CronTab.new(crontab).nexttime(ds.created_at)) return expired_interval || expired_crontab end |
#dispatch_background! ⇒ Object
Will run such services in background priority waves. If some services are already running, will not run services in subsequent waves until they are done – guard against multiple HTTP requests while services in progress.
Returns the Thread object used for dispatching background services.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'app/models/collection.rb', line 103 def dispatch_background! # Now we do some crazy magic, start a Thread to run our background # services. We are NOT going to wait for this thread to join, # we're going to let it keep doing it's thing in the background after # we return a response to the browser backgroundThread = Thread.new(self, umlaut_request) do | t_collection, t_request| # Tell our AR extension not to allow implicit checkouts ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!") begin # Set priority to lower for background thread; may or may not # actually have an effect in MRI, unclear, but can't hurt. prior = Thread.current.priority Thread.current.priority = prior - 1 # Try to give the thread scheduler another hint, really, get # other stuff done before this thread. Thread.pass force_refresh = false ('a'..'z').each do | priority | # force refresh only if we just ran some services, otherwise not enough # time has gone by to be worthwhile. runnable_ids = runnable_services_for_priority(priority, :refresh => force_refresh) services_to_run = self.instantiate_services!(:level => priority, :ids => runnable_ids) if services_to_run.empty? force_refresh = false next end ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id) force_refresh = true end rescue Exception => e # We are divorced from any HTTP request at this point, and may not # have access to an ActiveRecord connection. Not much # we can do except log it. # If we're catching an exception here, service processing was # probably interrupted, which is bad. You should not intentionally # raise exceptions to be caught here. # # Normally even unexpected exceptions were caught inside the ServiceWave, # and logged to db as well as logfile if possible, only bugs in ServiceWave # itself should wind up caught here. Thread.current[:exception] = e logger.error("Background Service execution exception: #{e.inspect}\n " + Umlaut::Util.clean_backtrace(e).join("\n ")) # One exception is in test environment, when we may be intentionally # trying to get exceptions to propagate up from ServiceWave to here, # and then onward, in order to be caught by tests. if self.forward_background_exceptions raise e end end end end |
#dispatch_foreground! ⇒ Object
Will run such services in foreground priority waves. And then reload the UmlautRequest object in the current thread, to pick up any changes made in service threads.
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'app/models/collection.rb', line 83 def dispatch_foreground! # Foreground services (0..9).each do | priority | services_to_run = self.instantiate_services!(:level => priority, :ids => runnable_services_for_priority(priority)) next if services_to_run.empty? ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id) end # Need to reload the request from db, so it gets changes # made by services in threads, so future code (such as view rendering) # will see changes. umlaut_request.reload end |
#dispatch_services! ⇒ Object
Starts running all services that are in this collection, for the given request set for this collection, if and only if they are not already in progress.
This method can be run on a request multiple times, it’ll only re-execute services that are executable (not already running, or timed out). That characteristic is used when this method is called on a page refresh or background update status check.
Sets all services in collection to have a ‘queued’ status if appropriate. Then actually executes the services that are dispatchable (queued).
Returns the Thread object used for dispatching background services
70 71 72 73 74 75 76 77 78 |
# File 'app/models/collection.rb', line 70 def dispatch_services! freshen_dispatches! mark_queued_if_empty! dispatch_foreground! # return main thread for background services. return dispatch_background! end |
#freshen_dispatches! ⇒ Object
Goes through existing DispatchedService objects, and freshens them up:
-
If a service is marked in progress longer than timeout, mark it failed temporary.
-
If an existing failed temporary is older than our resurrection time, delete the dispatch (and all it’s responses), so it can be re-queued.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'app/models/collection.rb', line 169 def freshen_dispatches! umlaut_request.dispatched_services.each do | ds | # go through dispatched_services and set still in progress but too long to failed temporary if ( (ds.status == DispatchedService::InProgress || ds.status == DispatchedService::Queued ) && (Time.now - ds.updated_at) > self.background_service_timeout) ds.store_exception( Exception.new("background service timed out (took longer than #{self.background_service_timeout} to run); thread assumed dead.")) unless ds.exception_info # Fail it temporary, it'll be run again. ds.status = DispatchedService::FailedTemporary ds.save! logger.warn("Background service timed out, thread assumed dead. #{umlaut_request.id} / #{ds.service_id}") end # go through dispatched_services and delete: # 1) old completed dispatches, too old to use. # 2) failedtemporary dispatches that are older than our resurrection time # -> And all responses associated with those dispatches. # After being deleted, they'll end up re-queued. if ( (ds.completed? && completed_dispatch_expired?(ds) ) || ( ds.status == DispatchedService::FailedTemporary && (Time.now - ds.updated_at) > self.requeue_failedtemporary_services_in ) ) # Need to expire. Delete all the service responses, and # the DispatchedService record, and service will be automatically # run again. serv_id = ds.service_id umlaut_request.service_responses.each do |response| if response.service_id == serv_id umlaut_request.service_responses.delete(response) response.destroy end end umlaut_request.dispatched_services.destroy(ds) end end end |
#get_service_definitions(options = {}) ⇒ Object
Get service definition hashes for services in this institution. options, returned in an array. Does return a mutatable array that Collection mutates internally, but clients really ought not to mutate. :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of service unique ids, return only these.
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'app/models/collection.rb', line 317 def get_service_definitions( = {}) [:task] ||= Service::StandardTask configs_for_task = @service_definitions[ [:task] ] || {} service_configs = case [:level] when nil # All of of them for this task configs_for_task.values.flatten else configs_for_task[ [:level] ] || [] end if [:ids] service_configs = service_configs.find_all {|s| [:ids].include? s["service_id"] } end return service_configs end |
#instantiate_services!(options = {}) ⇒ Object
Instantiate new copies of services included in this collection, which services specified by options, can combine: :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of id’s, only those.
295 296 297 298 299 |
# File 'app/models/collection.rb', line 295 def instantiate_services!( ={}) get_service_definitions().collect do |svc_def| ServiceStore.instantiate_service!(svc_def, umlaut_request) end end |
#link_out_service_level(level) ⇒ Object
Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.
303 304 305 306 |
# File 'app/models/collection.rb', line 303 def link_out_service_level(level) instantiate_services!(:task => Service::LinkOutFilterTask, :level => level) end |
#mark_queued_if_empty! ⇒ Object
For all configured services, if they have NO DispatchedService object, then create one with status Queued
215 216 217 218 219 220 221 222 223 224 225 |
# File 'app/models/collection.rb', line 215 def mark_queued_if_empty! our_service_ids = self.get_service_definitions.collect {|d| d["service_id"]} existing_dispatches = umlaut_request.dispatched_services.collect {|d| d.service_id} not_yet_existing = our_service_ids - existing_dispatches not_yet_existing.each do |service_id| umlaut_request.new_dispatch_object!(service_id, DispatchedService::Queued).save! end end |
#runnable_services_for_priority(priority, options = {}) ⇒ Object
All services for priority that are marked Queued, so long as no previous waves are still marked running.
Pass ‘:refresh => true` as second argument to force trip to the database to get fresh DispatchedService objects.
Returns array of service_id’s, or empty array.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'app/models/collection.rb', line 234 def runnable_services_for_priority(priority, = {}) DispatchedService.connection_pool.with_connection do service_definitions = self.get_service_definitions # Make a hash where key is service id, and value is priority.to_s service_to_priority = Hash[ service_definitions.collect do |d| [ d["service_id"], d["priority"].to_s ] end ] if [:refresh] # force a refresh umlaut_request.dispatched_services(true) end # If there is any service earlier than this wave still marked InProgress, # we're not ready to run this wave, return empty array. # Important to avoid race condition on HTTP requests, don't # dispatch later background waves unless earlier are actually complete, # even on an HTTP status check. previous_waves_running = umlaut_request.dispatched_services.find do |ds| ds.status == DispatchedService::InProgress && service_to_priority[ ds.service_id ] < priority.to_s end.present? return [] if previous_waves_running # otherwise, the services for this priority are runnable if # they are already marked Queued # We use .to_a, we want to use the already in memory array, not # go to the db here. return umlaut_request.dispatched_services.to_a.find_all do |ds| ds.status == DispatchedService::Queued && service_to_priority[ ds.service_id ] == priority.to_s end.collect {|ds| ds.service_id} end end |