Class: Collection

Inherits:
Object
  • Object
show all
Defined in:
app/models/collection.rb

Overview

A Collection object encapsulates a given UmlautRequest, and a given list of Umlaut services that should be run off that request.

That’s exactly what it’s initialized with: an umlaut request, and list of service definitions. Third parameter pass in an umlaut configuration object, to get various timeout values. If you don’t pass one in, defaults will be used.

The Collection holds and executes the logic for running those services, foreground and background, making sure no service is run twice if it’s already in progress, timing out expired services, etc.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) ⇒ Collection

a_umlaut_request is an UmlautRequest, representing a request for services for a context object. service_hash is a hash of hashes with service definitions, as would be in services.yml config is a Confstruct::Configuration associated with the current controller, has a few config options in it relevant to collection service exec; but don’t pass in, we’ll use a blank one with default values, no prob.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'app/models/collection.rb', line 33

def initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) 
  self.umlaut_request = a_umlaut_request

  self.logger = Rails.logger

  self.response_expire_interval = config.lookup!("response_expire_interval", 1.day)
  self.response_expire_crontab_format = config.lookup!("response_expire_crontab_format", nil)
  self.background_service_timeout =  config.lookup!("background_service_timeout", 30.seconds)
  self.requeue_failedtemporary_services_in = config.lookup!("requeue_failedtemporary_services_in", 500.seconds)

  # @service_definitions will be a two-level hash, pointing to an array.. Task is Standard, LinkOut, etc.
  # { [task] => { [priority_level] => [config1, config2, config3],
  #                [priority_level_2] => [configa], }
  #     [...]
  # }
  @service_definitions_flat = service_hash
  @service_definitions = {}

  # Arrange services by type and priority in @service_definitions
  gather_services
end

Instance Attribute Details

#background_service_timeoutObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def background_service_timeout
  @background_service_timeout
end

#loggerObject

Returns the value of attribute logger.



17
18
19
# File 'app/models/collection.rb', line 17

def logger
  @logger
end

#requeue_failedtemporary_services_inObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def requeue_failedtemporary_services_in
  @requeue_failedtemporary_services_in
end

#response_expire_crontab_formatObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def response_expire_crontab_format
  @response_expire_crontab_format
end

#response_expire_intervalObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def response_expire_interval
  @response_expire_interval
end

#umlaut_requestObject

Returns the value of attribute umlaut_request.



16
17
18
# File 'app/models/collection.rb', line 16

def umlaut_request
  @umlaut_request
end

Instance Method Details

#completed_dispatch_expired?(ds) ⇒ Boolean

Returns:

  • (Boolean)


241
242
243
244
245
246
247
248
249
250
251
252
# File 'app/models/collection.rb', line 241

def completed_dispatch_expired?(ds)
  interval = self.response_expire_interval
  crontab = self.response_expire_crontab_format
  now = Time.now

  return nil unless interval || crontab

  expired_interval = interval && (now - ds.created_at > interval)
  expired_crontab = crontab && (now > CronTab.new(crontab).nexttime(ds.created_at))

  return expired_interval || expired_crontab
end

#dispatch_background!(queued_service_ids) ⇒ Object

Call prepare_for_dispatch! first, the return value from that call is suitable as argument for this call: queued_service_ids, list of service id’s already identified as suitable for running, and marked queued in the DispatchedService table.

Will run such services in background priority waves.

Returns the Thread object used for dispatching background services.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'app/models/collection.rb', line 107

def dispatch_background!(queued_service_ids)
  # Now we do some crazy magic, start a Thread to run our background
  # services. We are NOT going to wait for this thread to join,
  # we're going to let it keep doing it's thing in the background after
  # we return a response to the browser
  backgroundThread = Thread.new(self, umlaut_request) do | t_collection,  t_request|

    # Tell our AR extension not to allow implicit checkouts
    ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")

    begin
      # Set priority to lower for background thread; may or may not
      # actually have an effect in MRI, unclear, but can't hurt.
      prior = Thread.current.priority
      Thread.current.priority = prior - 1

      # Try to give the thread scheduler another hint, really, get
      # other stuff done before this thread.
      Thread.pass


      ('a'..'z').each do | priority |
          services_to_run = self.instantiate_services!(:level => priority, :ids => queued_service_ids)
        next if services_to_run.empty?
        ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
      end
    rescue Exception => e
      # We are divorced from any HTTP request at this point, and may not
      # have access to an ActiveRecord connection. Not much
      # we can do except log it.
      # If we're catching an exception here, service processing was
      # probably interrupted, which is bad. You should not intentionally
      # raise exceptions to be caught here.
      #
      # Normally even unexpected exceptions were caught inside the ServiceWave,
      # and logged to db as well as logfile if possible, only bugs in ServiceWave
      # itself should wind up caught here.
      Thread.current[:exception] = e
      logger.error("Background Service execution exception: #{e.inspect}\n   " + Umlaut::Util.clean_backtrace(e).join("\n   "))

      # One exception is in test environment, when we may be intentionally
      # trying to get exceptions to propagate up from ServiceWave to here,
      # and then onward, in order to be caught by tests. 
      if self.forward_background_exceptions
        raise e
      end
    end
  end
end

#dispatch_foreground!(queued_service_ids) ⇒ Object

Call prepare_for_dispatch! first, the return value from that call is suitable as argument for this call: queued_service_ids, list of service id’s already identified as suitable for running, and marked queued in the DispatchedService table.

Will run such services in foreground priority waves. And then reload the UmlautRequest object in the current thread, to pick up any changes made in service threads.



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'app/models/collection.rb', line 85

def dispatch_foreground!(queued_service_ids)
  # Foreground services
  (0..9).each do | priority |
    services_to_run = self.instantiate_services!(:level => priority, :ids => queued_service_ids)
    next if services_to_run.empty?
    ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
  end

  # Need to reload the request from db, so it gets changes
  # made by services in threads, so future code (such as view rendering)
  # will see changes.
  umlaut_request.reload
end

#dispatch_services!Object

Starts running all services that are in this collection, for the given request set for this collection, if and only if they are not already in progress.

This method can be run on a request multiple times, it’ll only re-execute services that are executable (not already running, or timed out). That characteristic is used when this method is called on a page refresh or background update status check.

Sets all services in collection to have a ‘queued’ status if appropriate. Then actually executes the services that are dispatchable (queued).

Returns the Thread object used for dispatching background services



68
69
70
71
72
73
74
75
# File 'app/models/collection.rb', line 68

def dispatch_services!
  queued_service_ids = prepare_for_dispatch!

  dispatch_foreground!(queued_service_ids)

  # return main thread for background services.
  return dispatch_background!(queued_service_ids)
end

#get_service_definitions(options = {}) ⇒ Object

Get service definition hashes for services in this institution. options, returned in an array. Does return a mutatable array that Collection mutates internally, but clients really ought not to mutate. :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of service unique ids, return only these.



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'app/models/collection.rb', line 284

def get_service_definitions(options = {})
  options[:task] ||= Service::StandardTask

  configs_for_task = @service_definitions[ options[:task] ] || {}

  service_configs = case options[:level]
                      when nil
                        # All of of them for this task
                        configs_for_task.values.flatten
                      else
                        configs_for_task[ options[:level] ] || []
                    end
   if options[:ids]
     service_configs = service_configs.find_all {|s| options[:ids].include? s["service_id"] }
   end

   return service_configs
end

#instantiate_services!(options = {}) ⇒ Object

Instantiate new copies of services included in this collection, which services specified by options, can combine: :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of id’s, only those.



262
263
264
265
266
# File 'app/models/collection.rb', line 262

def instantiate_services!(options ={})
  get_service_definitions(options).collect do |svc_def|
    ServiceStore.instantiate_service!(svc_def, umlaut_request)
  end
end

Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.



270
271
272
273
# File 'app/models/collection.rb', line 270

def link_out_service_level(level)
  instantiate_services!(:task => Service::LinkOutFilterTask,
                        :level => level)
end

#prepare_for_dispatch!Object

Goes through all services and marks them with a DispatchedService record in ‘queued’ state.

Will time out any too-old services in a running state.

Will remove DispatchedService status for any services marked failed that are old enough to re-run, or services that are too old to re-use. Such services are then queuable.

Returns array of Service identifiers for services that are now queued and execable.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'app/models/collection.rb', line 169

def prepare_for_dispatch!
  # Go through currently dispatched services, looking for timed out
  # services -- services still in progress that have taken too long,
  # as well as service responses that are too old to be used.

  queued_service_ids = []
  DispatchedService.transaction do
    umlaut_request.dispatched_services.each do | ds |
        # go through dispatched_services and set stil in progress but too long to failed temporary
        if ( (ds.status == DispatchedService::InProgress ||
              ds.status == DispatchedService::Queued ) &&
              (Time.now - ds.updated_at) > self.background_service_timeout)

              ds.store_exception( Exception.new("background service timed out (took longer than #{self.background_service_timeout} to run); thread assumed dead.")) unless ds.exception_info
              # Fail it temporary, it'll be run again.
              ds.status = DispatchedService::FailedTemporary
              ds.save!
              logger.warn("Background service timed out, thread assumed dead. #{umlaut_request.id} / #{ds.service_id}")
         end



        # go through dispatched_services and delete:
        # 1) old completed dispatches, too old to use.
        # 2) failedtemporary dispatches that are older than our resurrection time
        # -> And all responses associated with those dispatches.
        # After being deleted, they'll end up re-queued.
        if ( (ds.completed? && completed_dispatch_expired?(ds) ) ||
             (  ds.status == DispatchedService::FailedTemporary &&
               (Time.now - ds.updated_at) > self.requeue_failedtemporary_services_in
              )
            )

          # Need to expire. Delete all the service responses, and
          # the DispatchedService record, and service will be automatically
          # run again.
          serv_id = ds.service_id

          umlaut_request.service_responses.each do |response|
            if response.service_id == serv_id
              umlaut_request.service_responses.delete(response)
              response.destroy
            end
          end

          umlaut_request.dispatched_services.destroy(ds)            
        end
    end

    # Queue any services without a dispatch marker at all, keeping
    # track of queued services, already existing or newly created.

    # Just in case, we're going to refetch dispatched_services from the db,
    # in case some other http request or background service updated things
    # recently.
    umlaut_request.dispatched_services.reset

    self.get_service_definitions.each do |service|
      service_id = service['service_id']
      # use in-memory #to_a search, don't go to db each time!
      if found = umlaut_request.dispatched_services.to_a.find {|s| s.service_id == service_id}
        queued_service_ids.push(service_id) if found.status == DispatchedService::Queued
      else
        umlaut_request.new_dispatch_object!(service_id, DispatchedService::Queued).save!
        queued_service_ids.push(service_id)
      end
    end
  end

  return queued_service_ids
end