Class: Collection

Inherits:
Object
  • Object
show all
Defined in:
app/models/collection.rb

Overview

A Collection object encapsulates a given UmlautRequest, and a given list of Umlaut services that should be run off that request.

That’s exactly what it’s initialized with: an umlaut request, and list of service definitions. Third parameter pass in an umlaut configuration object, to get various timeout values. If you don’t pass one in, defaults will be used.

The Collection holds and executes the logic for running those services, foreground and background, making sure no service is run twice if it’s already in progress, timing out expired services, etc.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) ⇒ Collection

a_umlaut_request is an UmlautRequest, representing a request for services for a context object. service_hash is a hash of hashes with service definitions, as would be in services.yml config is a Confstruct::Configuration associated with the current controller, has a few config options in it relevant to collection service exec; but don’t pass in, we’ll use a blank one with default values, no prob.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'app/models/collection.rb', line 28

def initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new)    
  self.umlaut_request = a_umlaut_request
  
  self.logger = Rails.logger
  
  self.response_expire_interval = config.lookup!("response_expire_interval", 1.day)
  self.response_expire_crontab_format = config.lookup!("response_expire_crontab_format", nil)
  self.background_service_timeout =  config.lookup!("background_service_timeout", 30.seconds)
  self.requeue_failedtemporary_services = config.lookup!("requeue_failedtemporary_services", 500.seconds)
      
  # @service_definitions will be a two-level hash, pointing to an array.. Task is Standard, LinkOut, etc.
  # { [task] => { [priority_level] => [config1, config2, config3],
  #                [priority_level_2] => [configa], }
  #     [...]
  # }
  @service_definitions_flat = service_hash
  @service_definitions = {}

  # Arrange services by type and priority in @service_definitions     
  gather_services
end

Instance Attribute Details

#background_service_timeoutObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def background_service_timeout
  @background_service_timeout
end

#loggerObject

Returns the value of attribute logger.



17
18
19
# File 'app/models/collection.rb', line 17

def logger
  @logger
end

#requeue_failedtemporary_servicesObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def requeue_failedtemporary_services
  @requeue_failedtemporary_services
end

#response_expire_crontab_formatObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def response_expire_crontab_format
  @response_expire_crontab_format
end

#response_expire_intervalObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def response_expire_interval
  @response_expire_interval
end

#umlaut_requestObject

Returns the value of attribute umlaut_request.



16
17
18
# File 'app/models/collection.rb', line 16

def umlaut_request
  @umlaut_request
end

Instance Method Details

#completed_dispatch_expired?(ds) ⇒ Boolean

Returns:

  • (Boolean)


222
223
224
225
226
227
228
229
230
231
232
233
# File 'app/models/collection.rb', line 222

def completed_dispatch_expired?(ds)
  interval = self.response_expire_interval
  crontab = self.response_expire_crontab_format
  now = Time.now
  
  return nil unless interval || crontab 
  
  expired_interval = interval && (now - ds.created_at > interval)
  expired_crontab = crontab && (now > CronTab.new(crontab).nexttime(ds.created_at))
  
  return expired_interval || expired_crontab
end

#dispatch_background!(queued_service_ids) ⇒ Object

Call prepare_for_dispatch! first, the return value from that call is suitable as argument for this call: queued_service_ids, list of service id’s already identified as suitable for running, and marked queued in the DispatchedService table.

Will run such services in background priority waves.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'app/models/collection.rb', line 97

def dispatch_background!(queued_service_ids)
  # Now we do some crazy magic, start a Thread to run our background
  # services. We are NOT going to wait for this thread to join,
  # we're going to let it keep doing it's thing in the background after
  # we return a response to the browser
  backgroundThread = Thread.new(self, umlaut_request) do | t_collection,  t_request|
    
    # Tell our AR extension not to allow implicit checkouts
    ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")
    
    begin
      # Set priority to lower for background thread; may or may not
      # actually have an effect in MRI, unclear, but can't hurt. 
      prior = Thread.current.priority
      Thread.current.priority = prior - 1          
      
      # Try to give the thread scheduler another hint, really, get
      # other stuff done before this thread. 
      Thread.pass
      
      
      ('a'..'z').each do | priority |
          services_to_run = self.instantiate_services!(:level => priority, :ids => queued_service_ids)
        next if services_to_run.empty?
        ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)                               
      end        
    rescue Exception => e
      # We are divorced from any HTTP request at this point, and may not
      # have access to an ActiveRecord connection. Not much
      # we can do except log it. 
      # If we're catching an exception here, service processing was
      # probably interrupted, which is bad. You should not intentionally
      # raise exceptions to be caught here.
      #
      # Normally even unexpected exceptions were caught inside the ServiceWave,
      # and logged to db as well as logfile if possible, only bugs in ServiceWave
      # itself should wind up caught here. 
      Thread.current[:exception] = e
      logger.error("Background Service execution exception: #{e}\n\n   " + clean_backtrace(e).join("\n"))                
    end
  end
end

#dispatch_foreground!(queued_service_ids) ⇒ Object

Call prepare_for_dispatch! first, the return value from that call is suitable as argument for this call: queued_service_ids, list of service id’s already identified as suitable for running, and marked queued in the DispatchedService table.

Will run such services in foreground priority waves. And then reload the UmlautRequest object in the current thread, to pick up any changes made in service threads.



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'app/models/collection.rb', line 77

def dispatch_foreground!(queued_service_ids)
  # Foreground services
  (0..9).each do | priority |      
    services_to_run = self.instantiate_services!(:level => priority, :ids => queued_service_ids)
    next if services_to_run.empty?      
    ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
  end
  
  # Need to reload the request from db, so it gets changes
  # made by services in threads, so future code (such as view rendering)
  # will see changes.     
  umlaut_request.reload
end

#dispatch_services!Object

Starts running all services that are in this collection, for the given request set for this collection, if and only if they are not already in progress.

This method can be run on a request multiple times, it’ll only re-execute services that are executable (not already running, or timed out). That characteristic is used when this method is called on a page refresh or background update status check.

Sets all services in collection to have a ‘queued’ status if appropriate. Then actually executes the services that are dispatchable (queued).



61
62
63
64
65
66
67
# File 'app/models/collection.rb', line 61

def dispatch_services!
  queued_service_ids = prepare_for_dispatch!
      
  dispatch_foreground!(queued_service_ids)
      
  dispatch_background!(queued_service_ids)        
end

#get_service_definitions(options = {}) ⇒ Object

Get service definition hashes for services in this institution. options, returned in an array. Does return a mutatable array that Collection mutates internally, but clients really ought not to mutate. :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of service unique ids, return only these.



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'app/models/collection.rb', line 265

def get_service_definitions(options = {})
  options[:task] ||= Service::StandardTask
  
  configs_for_task = @service_definitions[ options[:task] ] || {}
  
  service_configs = case options[:level]
                      when nil
                        # All of of them for this task
                        configs_for_task.values.flatten
                      else
                        configs_for_task[ options[:level] ] || []
                    end
   if options[:ids]
     service_configs = service_configs.find_all {|s| options[:ids].include? s["service_id"] }
   end
                                                    
   return service_configs 
end

#instantiate_services!(options = {}) ⇒ Object

Instantiate new copies of services included in this collection, which services specified by options, can combine: :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of id’s, only those.



243
244
245
246
247
# File 'app/models/collection.rb', line 243

def instantiate_services!(options ={})
  get_service_definitions(options).collect do |svc_def|
    ServiceStore.instantiate_service!(svc_def, umlaut_request)
  end
end

Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.



251
252
253
254
# File 'app/models/collection.rb', line 251

def link_out_service_level(level)
  instantiate_services!(:task => Service::LinkOutFilterTask, 
                        :level => level)
end

#prepare_for_dispatch!Object

Goes through all services and marks them with a DispatchedService record in ‘queued’ state.

Will time out any too-old services in a running state.

Will remove DispatchedService status for any services marked failed that are old enough to re-run, or services that are too old to re-use. Such services are then queuable.

Returns array of Service identifiers for services that are now queued and execable.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'app/models/collection.rb', line 152

def prepare_for_dispatch!
  # Go through currently dispatched services, looking for timed out
  # services -- services still in progress that have taken too long,
  # as well as service responses that are too old to be used.
  queued_service_ids = []
  DispatchedService.transaction do
    umlaut_request.dispatched_services.each do | ds |
        # go through dispatched_services and set stil in progress but too long to failed temporary
        if ( (ds.status == DispatchedService::InProgress ||
              ds.status == DispatchedService::Queued ) &&
              (Time.now - ds.updated_at) > self.background_service_timeout)

              ds.store_exception( Exception.new("background service timed out (took longer than #{self.background_service_timeout} to run); thread assumed dead.")) unless ds.exception_info
              # Fail it temporary, it'll be run again. 
              ds.status = DispatchedService::FailedTemporary
              ds.save!
              logger.warn("Background service timed out, thread assumed dead. #{umlaut_request.id} / #{ds.service_id}")
         end
         
        # go through dispatched_services and delete:
        # 1) old completed dispatches, too old to use. 
        # 2) failedtemporary dispatches that are older than our resurrection time
        # -> And all responses associated with those dispatches. 
        # After being deleted, they'll end up re-queued. 
        if ( (ds.completed? && completed_dispatch_expired?(ds) ) ||
             (  ds.status == DispatchedService::FailedTemporary &&
               (Time.now - ds.updated_at) > self.requeue_failedtemporary_services 
              )
            )
            
          # Need to expire. Delete all the service responses, and
          # the DispatchedService record, and service will be automatically
          # run again.
          serv_id = ds.service_id
          
          umlaut_request.service_responses.each do |response|            
            if response.service_id == serv_id
              umlaut_request.service_responses.delete(response)              
              response.destroy
            end
          end
          
          umlaut_request.dispatched_services.delete(ds)
          ds.destroy
        end
    end
        
    # Queue any services without a dispatch marker at all, keeping
    # track of queued services, already existing or newly created. 
    
    # Just in case, we're going to refetch dispatched_services from the db,
    # in case some other http request or background service updated things
    # recently. 
    umlaut_request.dispatched_services.reset
    
    self.get_service_definitions.each do |service|
      service_id = service['service_id']
      # use in-memory #to_a search, don't go to db each time!
      if found = umlaut_request.dispatched_services.to_a.find {|s| s.service_id == service_id}
        queued_service_ids.push(service_id) if found.status == DispatchedService::Queued
      else
        umlaut_request.new_dispatch_object!(service_id, DispatchedService::Queued).save!
        queued_service_ids.push(service_id)
      end        
    end
  end
    
  return queued_service_ids
end