Class: Collection

Inherits:
Object
  • Object
show all
Defined in:
app/models/collection.rb

Overview

A Collection object encapsulates a given UmlautRequest, and a given list of Umlaut services that should be run off that request.

That’s exactly what it’s initialized with: an umlaut request, and list of service definitions. Third parameter pass in an umlaut configuration object, to get various timeout values. If you don’t pass one in, defaults will be used.

The Collection holds and executes the logic for running those services, foreground and background, making sure no service is run twice if it’s already in progress, timing out expired services, etc.

This code is a mess, sorry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) ⇒ Collection

a_umlaut_request is an UmlautRequest, representing a request for services for a context object. service_hash is a hash of hashes with service definitions, as would be in services.yml config is a Confstruct::Configuration associated with the current controller, has a few config options in it relevant to collection service exec; but don’t pass in, we’ll use a blank one with default values, no prob.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'app/models/collection.rb', line 35

def initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) 
  self.umlaut_request = a_umlaut_request

  self.logger = Rails.logger

  self.response_expire_interval = config.lookup!("response_expire_interval", 1.day)
  self.response_expire_crontab_format = config.lookup!("response_expire_crontab_format", nil)
  self.background_service_timeout =  config.lookup!("background_service_timeout", 30.seconds)
  self.requeue_failedtemporary_services_in = config.lookup!("requeue_failedtemporary_services_in", 500.seconds)

  # @service_definitions will be a two-level hash, pointing to an array.. Task is Standard, LinkOut, etc.
  # { [task] => { [priority_level] => [config1, config2, config3],
  #                [priority_level_2] => [configa], }
  #     [...]
  # }
  @service_definitions_flat = service_hash
  @service_definitions = {}

  # Arrange services by type and priority in @service_definitions
  gather_services
end

Instance Attribute Details

#background_service_timeoutObject

configs



21
22
23
# File 'app/models/collection.rb', line 21

def background_service_timeout
  @background_service_timeout
end

#loggerObject

Returns the value of attribute logger.



19
20
21
# File 'app/models/collection.rb', line 19

def logger
  @logger
end

#requeue_failedtemporary_services_inObject

configs



21
22
23
# File 'app/models/collection.rb', line 21

def requeue_failedtemporary_services_in
  @requeue_failedtemporary_services_in
end

#response_expire_crontab_formatObject

configs



21
22
23
# File 'app/models/collection.rb', line 21

def response_expire_crontab_format
  @response_expire_crontab_format
end

#response_expire_intervalObject

configs



21
22
23
# File 'app/models/collection.rb', line 21

def response_expire_interval
  @response_expire_interval
end

#umlaut_requestObject

Returns the value of attribute umlaut_request.



18
19
20
# File 'app/models/collection.rb', line 18

def umlaut_request
  @umlaut_request
end

Instance Method Details

#completed_dispatch_expired?(ds) ⇒ Boolean

Returns:

  • (Boolean)


274
275
276
277
278
279
280
281
282
283
284
285
# File 'app/models/collection.rb', line 274

def completed_dispatch_expired?(ds)
  interval = self.response_expire_interval
  crontab = self.response_expire_crontab_format
  now = Time.now

  return nil unless interval || crontab

  expired_interval = interval && (now - ds.created_at > interval)
  expired_crontab = crontab && (now > CronTab.new(crontab).nexttime(ds.created_at))

  return expired_interval || expired_crontab
end

#dispatch_background!Object

Will run such services in background priority waves. If some services are already running, will not run services in subsequent waves until they are done – guard against multiple HTTP requests while services in progress.

Returns the Thread object used for dispatching background services.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'app/models/collection.rb', line 103

def dispatch_background!
  # Now we do some crazy magic, start a Thread to run our background
  # services. We are NOT going to wait for this thread to join,
  # we're going to let it keep doing it's thing in the background after
  # we return a response to the browser
  backgroundThread = Thread.new(self, umlaut_request) do | t_collection,  t_request|

    # Tell our AR extension not to allow implicit checkouts
    ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")

    begin
      # Set priority to lower for background thread; may or may not
      # actually have an effect in MRI, unclear, but can't hurt.
      prior = Thread.current.priority
      Thread.current.priority = prior - 1

      # Try to give the thread scheduler another hint, really, get
      # other stuff done before this thread.
      Thread.pass

      force_refresh = false

      ('a'..'z').each do | priority |
        # force refresh only if we just ran some services, otherwise not enough
        # time has gone by to be worthwhile. 
        runnable_ids = runnable_services_for_priority(priority, :refresh => force_refresh)

        services_to_run = self.instantiate_services!(:level => priority, :ids => runnable_ids)
        
        if services_to_run.empty?
          force_refresh = false
          next
        end

        ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
        force_refresh = true
      end
    rescue Exception => e
      # We are divorced from any HTTP request at this point, and may not
      # have access to an ActiveRecord connection. Not much
      # we can do except log it.
      # If we're catching an exception here, service processing was
      # probably interrupted, which is bad. You should not intentionally
      # raise exceptions to be caught here.
      #
      # Normally even unexpected exceptions were caught inside the ServiceWave,
      # and logged to db as well as logfile if possible, only bugs in ServiceWave
      # itself should wind up caught here.
      Thread.current[:exception] = e
      logger.error("Background Service execution exception: #{e.inspect}\n   " + Umlaut::Util.clean_backtrace(e).join("\n   "))

      # One exception is in test environment, when we may be intentionally
      # trying to get exceptions to propagate up from ServiceWave to here,
      # and then onward, in order to be caught by tests. 
      if self.forward_background_exceptions
        raise e
      end
    end
  end
end

#dispatch_foreground!Object

Will run such services in foreground priority waves. And then reload the UmlautRequest object in the current thread, to pick up any changes made in service threads.



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'app/models/collection.rb', line 83

def dispatch_foreground!
  # Foreground services
  (0..9).each do | priority |
    services_to_run = self.instantiate_services!(:level => priority, :ids => runnable_services_for_priority(priority))
    next if services_to_run.empty?
    ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
  end

  # Need to reload the request from db, so it gets changes
  # made by services in threads, so future code (such as view rendering)
  # will see changes.
  umlaut_request.reload
end

#dispatch_services!Object

Starts running all services that are in this collection, for the given request set for this collection, if and only if they are not already in progress.

This method can be run on a request multiple times, it’ll only re-execute services that are executable (not already running, or timed out). That characteristic is used when this method is called on a page refresh or background update status check.

Sets all services in collection to have a ‘queued’ status if appropriate. Then actually executes the services that are dispatchable (queued).

Returns the Thread object used for dispatching background services



70
71
72
73
74
75
76
77
78
# File 'app/models/collection.rb', line 70

def dispatch_services!
  freshen_dispatches!
  mark_queued_if_empty!
  
  dispatch_foreground!

  # return main thread for background services.
  return dispatch_background!
end

#freshen_dispatches!Object

Goes through existing DispatchedService objects, and freshens them up:

  • If a service is marked in progress longer than timeout, mark it failed temporary.

  • If an existing failed temporary is older than our resurrection time, delete the dispatch (and all it’s responses), so it can be re-queued.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'app/models/collection.rb', line 169

def freshen_dispatches!
  umlaut_request.dispatched_services.each do | ds |
    # go through dispatched_services and set still in progress but too long to failed temporary
    if ( (ds.status == DispatchedService::InProgress ||
          ds.status == DispatchedService::Queued ) &&
          (Time.now - ds.updated_at) > self.background_service_timeout)

          ds.store_exception( Exception.new("background service timed out (took longer than #{self.background_service_timeout} to run); thread assumed dead.")) unless ds.exception_info
          # Fail it temporary, it'll be run again.
          ds.status = DispatchedService::FailedTemporary
          ds.save!
          logger.warn("Background service timed out, thread assumed dead. #{umlaut_request.id} / #{ds.service_id}")
     end



    # go through dispatched_services and delete:
    # 1) old completed dispatches, too old to use.
    # 2) failedtemporary dispatches that are older than our resurrection time
    # -> And all responses associated with those dispatches.
    # After being deleted, they'll end up re-queued.
    if ( (ds.completed? && completed_dispatch_expired?(ds) ) ||
         (  ds.status == DispatchedService::FailedTemporary &&
           (Time.now - ds.updated_at) > self.requeue_failedtemporary_services_in
          )
        )

      # Need to expire. Delete all the service responses, and
      # the DispatchedService record, and service will be automatically
      # run again.
      serv_id = ds.service_id

      umlaut_request.service_responses.each do |response|
        if response.service_id == serv_id
          umlaut_request.service_responses.delete(response)
          response.destroy
        end
      end

      umlaut_request.dispatched_services.destroy(ds)            
    end
  end
end

#get_service_definitions(options = {}) ⇒ Object

Get service definition hashes for services in this institution. options, returned in an array. Does return a mutatable array that Collection mutates internally, but clients really ought not to mutate. :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of service unique ids, return only these.



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'app/models/collection.rb', line 317

def get_service_definitions(options = {})
  options[:task] ||= Service::StandardTask

  configs_for_task = @service_definitions[ options[:task] ] || {}

  service_configs = case options[:level]
                      when nil
                        # All of of them for this task
                        configs_for_task.values.flatten
                      else
                        configs_for_task[ options[:level] ] || []
                    end
   if options[:ids]
     service_configs = service_configs.find_all {|s| options[:ids].include? s["service_id"] }
   end

   return service_configs
end

#instantiate_services!(options = {}) ⇒ Object

Instantiate new copies of services included in this collection, which services specified by options, can combine: :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of id’s, only those.



295
296
297
298
299
# File 'app/models/collection.rb', line 295

def instantiate_services!(options ={})
  get_service_definitions(options).collect do |svc_def|
    ServiceStore.instantiate_service!(svc_def, umlaut_request)
  end
end

Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.



303
304
305
306
# File 'app/models/collection.rb', line 303

def link_out_service_level(level)
  instantiate_services!(:task => Service::LinkOutFilterTask,
                        :level => level)
end

#mark_queued_if_empty!Object

For all configured services, if they have NO DispatchedService object, then create one with status Queued



215
216
217
218
219
220
221
222
223
224
225
# File 'app/models/collection.rb', line 215

def mark_queued_if_empty!
  our_service_ids = self.get_service_definitions.collect {|d| d["service_id"]}

  existing_dispatches = umlaut_request.dispatched_services.collect {|d| d.service_id}

  not_yet_existing = our_service_ids - existing_dispatches

  not_yet_existing.each do |service_id|
    umlaut_request.new_dispatch_object!(service_id, DispatchedService::Queued).save!
  end    
end

#runnable_services_for_priority(priority, options = {}) ⇒ Object

All services for priority that are marked Queued, so long as no previous waves are still marked running.

Pass ‘:refresh => true` as second argument to force trip to the database to get fresh DispatchedService objects.

Returns array of service_id’s, or empty array.



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'app/models/collection.rb', line 234

def runnable_services_for_priority(priority, options = {})
  DispatchedService.connection_pool.with_connection do
    service_definitions = self.get_service_definitions

    # Make a hash where key is service id, and value is priority.to_s
    service_to_priority = Hash[
      service_definitions.collect do |d|
        [ d["service_id"], d["priority"].to_s ]
      end
    ]

    if options[:refresh]
      # force a refresh
      umlaut_request.dispatched_services(true)
    end

    # If there is any service earlier than this wave still marked InProgress, 
    # we're not ready to run this wave, return empty array. 
    # Important to avoid race condition on HTTP requests, don't
    # dispatch later background waves unless earlier are actually complete,
    # even on an HTTP status check. 
    previous_waves_running = umlaut_request.dispatched_services.find do |ds|
      ds.status == DispatchedService::InProgress &&
      service_to_priority[ ds.service_id ] < priority.to_s
    end.present?
    return [] if previous_waves_running

    # otherwise, the services for this priority are runnable if
    # they are already marked Queued
    # We use .to_a, we want to use the already in memory array, not
    # go to the db here. 
    return umlaut_request.dispatched_services.to_a.find_all do |ds|
      ds.status == DispatchedService::Queued &&
      service_to_priority[ ds.service_id ] == priority.to_s
    end.collect {|ds| ds.service_id}
  end
end