Class: QueueProcessor::RootCalculation::QueueControl::CalculationRequest

Inherits:
Struct
  • Object
show all
Defined in:
lib/queue-processor/root_calculation/queue_control.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#clazzObject

Returns the value of attribute clazz

Returns:

  • (Object)

    the current value of clazz



101
102
103
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101

def clazz
  @clazz
end

#descriptionObject

Returns the value of attribute description

Returns:

  • (Object)

    the current value of description



101
102
103
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101

def description
  @description
end

#idObject

Returns the value of attribute id

Returns:

  • (Object)

    the current value of id



101
102
103
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101

def id
  @id
end

#queued_atObject

Returns the value of attribute queued_at

Returns:

  • (Object)

    the current value of queued_at



101
102
103
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101

def queued_at
  @queued_at
end

Instance Method Details

#display_nameObject



138
139
140
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 138

def display_name
  "calculate #{description} (id=#{id}, queued_at=#{queued_at})"
end

#performObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 102

def perform
  obj = clazz.where("id=? and date_trunc('milliseconds',queued_at)=date_trunc('milliseconds',?::timestamp)",id, queued_at).first

  if (obj.present?)
    Rails.logger.debug {"#{obj.describe}: dequeued by delayed job"}

    obj.delayed_job_queued_at = queued_at
    if obj.fire_events(:dequeue_processing)
      Rails.logger.info("#{obj.describe}: starting processing.")
      result = obj.fire_events(:start_processing)

      if (result)
        Rails.logger.info("#{obj.describe}: finished calculation and done in delayed job.")
      end
    else
      # somebody else is running, we have to wait for them to finish
      if (obj.calculating?)
        if obj.recover_after.present? && (Time.now - queued_at) >= obj.recover_after
          Rails.logger.error("#{obj.describe}: has been stuck processing for too long (#{obj.recover_after} seconds) - reseting state machines and re-queueing")
          obj.reset_state_machines!
          obj.add_to_queue
        else
          obj.requeue_work(queued_at)
        end
      end
    end
  else
    obj = clazz.where(:id => id).first
    if obj.nil?
      Rails.logger.warn("#{clazz}: #{id}: Skipping: #{id} - Does not exist")
    else
      Rails.logger.warn("#{obj.describe}: Skipped - Queued at: #{obj.queued_at} doesn't match delayed job queued at: #{queued_at}")
    end
  end
end