Class: Collection

Inherits:
Object
  • Object
show all
Defined in:
app/models/collection.rb

Overview

A Collection object encapsulates a given UmlautRequest, and a given list of Umlaut services that should be run off that request.

That’s exactly what it’s initialized with: an umlaut request, and list of service definitions. Third parameter pass in an umlaut configuration object, to get various timeout values. If you don’t pass one in, defaults will be used.

The Collection holds and executes the logic for running those services, foreground and background, making sure no service is run twice if it’s already in progress, timing out expired services, etc.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new) ⇒ Collection

a_umlaut_request is an UmlautRequest, representing a request for services for a context object. service_hash is a hash of hashes with service definitions, as would be in services.yml config is a Confstruct::Configuration associated with the current controller, has a few config options in it relevant to collection service exec; but don’t pass in, we’ll use a blank one with default values, no prob.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'app/models/collection.rb', line 30

def initialize(a_umlaut_request, service_hash, config = Confstruct::Configuration.new)
  self.umlaut_request = a_umlaut_request

  self.logger = Rails.logger

  self.response_expire_interval = config.lookup!("response_expire_interval", 1.day)
  self.response_expire_crontab_format = config.lookup!("response_expire_crontab_format", nil)
  self.background_service_timeout =  config.lookup!("background_service_timeout", 30.seconds)
  self.requeue_failedtemporary_services = config.lookup!("requeue_failedtemporary_services", 500.seconds)

  # @service_definitions will be a two-level hash, pointing to an array.. Task is Standard, LinkOut, etc.
  # { [task] => { [priority_level] => [config1, config2, config3],
  #                [priority_level_2] => [configa], }
  #     [...]
  # }
  @service_definitions_flat = service_hash
  @service_definitions = {}

  # Arrange services by type and priority in @service_definitions
  gather_services
end

Instance Attribute Details

#background_service_timeoutObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def background_service_timeout
  @background_service_timeout
end

#loggerObject

Returns the value of attribute logger.



17
18
19
# File 'app/models/collection.rb', line 17

def logger
  @logger
end

#requeue_failedtemporary_servicesObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def requeue_failedtemporary_services
  @requeue_failedtemporary_services
end

#response_expire_crontab_formatObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def response_expire_crontab_format
  @response_expire_crontab_format
end

#response_expire_intervalObject

configs



19
20
21
# File 'app/models/collection.rb', line 19

def response_expire_interval
  @response_expire_interval
end

#umlaut_requestObject

Returns the value of attribute umlaut_request.



16
17
18
# File 'app/models/collection.rb', line 16

def umlaut_request
  @umlaut_request
end

Instance Method Details

#completed_dispatch_expired?(ds) ⇒ Boolean

Returns:

  • (Boolean)


224
225
226
227
228
229
230
231
232
233
234
235
# File 'app/models/collection.rb', line 224

def completed_dispatch_expired?(ds)
  interval = self.response_expire_interval
  crontab = self.response_expire_crontab_format
  now = Time.now

  return nil unless interval || crontab

  expired_interval = interval && (now - ds.created_at > interval)
  expired_crontab = crontab && (now > CronTab.new(crontab).nexttime(ds.created_at))

  return expired_interval || expired_crontab
end

#dispatch_background!(queued_service_ids) ⇒ Object

Call prepare_for_dispatch! first, the return value from that call is suitable as argument for this call: queued_service_ids, list of service id’s already identified as suitable for running, and marked queued in the DispatchedService table.

Will run such services in background priority waves.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'app/models/collection.rb', line 99

def dispatch_background!(queued_service_ids)
  # Now we do some crazy magic, start a Thread to run our background
  # services. We are NOT going to wait for this thread to join,
  # we're going to let it keep doing it's thing in the background after
  # we return a response to the browser
  backgroundThread = Thread.new(self, umlaut_request) do | t_collection,  t_request|

    # Tell our AR extension not to allow implicit checkouts
    ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")

    begin
      # Set priority to lower for background thread; may or may not
      # actually have an effect in MRI, unclear, but can't hurt.
      prior = Thread.current.priority
      Thread.current.priority = prior - 1

      # Try to give the thread scheduler another hint, really, get
      # other stuff done before this thread.
      Thread.pass


      ('a'..'z').each do | priority |
          services_to_run = self.instantiate_services!(:level => priority, :ids => queued_service_ids)
        next if services_to_run.empty?
        ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
      end
    rescue Exception => e
      # We are divorced from any HTTP request at this point, and may not
      # have access to an ActiveRecord connection. Not much
      # we can do except log it.
      # If we're catching an exception here, service processing was
      # probably interrupted, which is bad. You should not intentionally
      # raise exceptions to be caught here.
      #
      # Normally even unexpected exceptions were caught inside the ServiceWave,
      # and logged to db as well as logfile if possible, only bugs in ServiceWave
      # itself should wind up caught here.
      Thread.current[:exception] = e
      logger.error("Background Service execution exception: #{e}\n\n   " + clean_backtrace(e).join("\n"))
    end
  end
end

#dispatch_foreground!(queued_service_ids) ⇒ Object

Call prepare_for_dispatch! first, the return value from that call is suitable as argument for this call: queued_service_ids, list of service id’s already identified as suitable for running, and marked queued in the DispatchedService table.

Will run such services in foreground priority waves. And then reload the UmlautRequest object in the current thread, to pick up any changes made in service threads.



79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'app/models/collection.rb', line 79

def dispatch_foreground!(queued_service_ids)
  # Foreground services
  (0..9).each do | priority |
    services_to_run = self.instantiate_services!(:level => priority, :ids => queued_service_ids)
    next if services_to_run.empty?
    ServiceWave.new(services_to_run , priority).handle(umlaut_request, umlaut_request.session_id)
  end

  # Need to reload the request from db, so it gets changes
  # made by services in threads, so future code (such as view rendering)
  # will see changes.
  umlaut_request.reload
end

#dispatch_services!Object

Starts running all services that are in this collection, for the given request set for this collection, if and only if they are not already in progress.

This method can be run on a request multiple times, it’ll only re-execute services that are executable (not already running, or timed out). That characteristic is used when this method is called on a page refresh or background update status check.

Sets all services in collection to have a ‘queued’ status if appropriate. Then actually executes the services that are dispatchable (queued).



63
64
65
66
67
68
69
# File 'app/models/collection.rb', line 63

def dispatch_services!
  queued_service_ids = prepare_for_dispatch!

  dispatch_foreground!(queued_service_ids)

  dispatch_background!(queued_service_ids)
end

#get_service_definitions(options = {}) ⇒ Object

Get service definition hashes for services in this institution. options, returned in an array. Does return a mutatable array that Collection mutates internally, but clients really ought not to mutate. :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of service unique ids, return only these.



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'app/models/collection.rb', line 267

def get_service_definitions(options = {})
  options[:task] ||= Service::StandardTask

  configs_for_task = @service_definitions[ options[:task] ] || {}

  service_configs = case options[:level]
                      when nil
                        # All of of them for this task
                        configs_for_task.values.flatten
                      else
                        configs_for_task[ options[:level] ] || []
                    end
   if options[:ids]
     service_configs = service_configs.find_all {|s| options[:ids].include? s["service_id"] }
   end

   return service_configs
end

#instantiate_services!(options = {}) ⇒ Object

Instantiate new copies of services included in this collection, which services specified by options, can combine: :task => Service::StandardTask (default) or Service::LinkOutFilterTask :level => priority level, default to returning services from all. :ids => list of id’s, only those.



245
246
247
248
249
# File 'app/models/collection.rb', line 245

def instantiate_services!(options ={})
  get_service_definitions(options).collect do |svc_def|
    ServiceStore.instantiate_service!(svc_def, umlaut_request)
  end
end

Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.



253
254
255
256
# File 'app/models/collection.rb', line 253

def link_out_service_level(level)
  instantiate_services!(:task => Service::LinkOutFilterTask,
                        :level => level)
end

#prepare_for_dispatch!Object

Goes through all services and marks them with a DispatchedService record in ‘queued’ state.

Will time out any too-old services in a running state.

Will remove DispatchedService status for any services marked failed that are old enough to re-run, or services that are too old to re-use. Such services are then queuable.

Returns array of Service identifiers for services that are now queued and execable.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'app/models/collection.rb', line 154

def prepare_for_dispatch!
  # Go through currently dispatched services, looking for timed out
  # services -- services still in progress that have taken too long,
  # as well as service responses that are too old to be used.
  queued_service_ids = []
  DispatchedService.transaction do
    umlaut_request.dispatched_services.each do | ds |
        # go through dispatched_services and set stil in progress but too long to failed temporary
        if ( (ds.status == DispatchedService::InProgress ||
              ds.status == DispatchedService::Queued ) &&
              (Time.now - ds.updated_at) > self.background_service_timeout)

              ds.store_exception( Exception.new("background service timed out (took longer than #{self.background_service_timeout} to run); thread assumed dead.")) unless ds.exception_info
              # Fail it temporary, it'll be run again.
              ds.status = DispatchedService::FailedTemporary
              ds.save!
              logger.warn("Background service timed out, thread assumed dead. #{umlaut_request.id} / #{ds.service_id}")
         end

        # go through dispatched_services and delete:
        # 1) old completed dispatches, too old to use.
        # 2) failedtemporary dispatches that are older than our resurrection time
        # -> And all responses associated with those dispatches.
        # After being deleted, they'll end up re-queued.
        if ( (ds.completed? && completed_dispatch_expired?(ds) ) ||
             (  ds.status == DispatchedService::FailedTemporary &&
               (Time.now - ds.updated_at) > self.requeue_failedtemporary_services
              )
            )

          # Need to expire. Delete all the service responses, and
          # the DispatchedService record, and service will be automatically
          # run again.
          serv_id = ds.service_id

          umlaut_request.service_responses.each do |response|
            if response.service_id == serv_id
              umlaut_request.service_responses.delete(response)
              response.destroy
            end
          end

          umlaut_request.dispatched_services.delete(ds)
          ds.destroy
        end
    end

    # Queue any services without a dispatch marker at all, keeping
    # track of queued services, already existing or newly created.

    # Just in case, we're going to refetch dispatched_services from the db,
    # in case some other http request or background service updated things
    # recently.
    umlaut_request.dispatched_services.reset

    self.get_service_definitions.each do |service|
      service_id = service['service_id']
      # use in-memory #to_a search, don't go to db each time!
      if found = umlaut_request.dispatched_services.to_a.find {|s| s.service_id == service_id}
        queued_service_ids.push(service_id) if found.status == DispatchedService::Queued
      else
        umlaut_request.new_dispatch_object!(service_id, DispatchedService::Queued).save!
        queued_service_ids.push(service_id)
      end
    end
  end

  return queued_service_ids
end