Class: BackgroundQueue::ServerLib::ErrorTaskList

Inherits:
Object
  • Object
show all
Defined in:
lib/background_queue/server_lib/error_task_list.rb

Defined Under Namespace

Classes: RunAt

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server) ⇒ ErrorTaskList

Returns a new instance of ErrorTaskList.



9
10
11
12
13
14
15
16
# File 'lib/background_queue/server_lib/error_task_list.rb', line 9

def initialize(server)
  @server = server
  @tasks = Containers::RBTreeMap.new
  @mutex = Mutex.new
  @current_next_at = nil
  @current_runner = nil
  @task_count = 0
end

Instance Attribute Details

#task_countObject (readonly)

Returns the value of attribute task_count.



7
8
9
# File 'lib/background_queue/server_lib/error_task_list.rb', line 7

def task_count
  @task_count
end

#tasksObject (readonly)

Returns the value of attribute tasks.



6
7
8
# File 'lib/background_queue/server_lib/error_task_list.rb', line 6

def tasks
  @tasks
end

Instance Method Details

#add_item(task, time_at) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/background_queue/server_lib/error_task_list.rb', line 29

def add_item(task, time_at)
  @mutex.synchronize {
    existing = @tasks[time_at]
    if existing.nil?
      existing = []
      @tasks[time_at] = existing
    end
    existing << task
    @task_count += 1
    queue_next_event(time_at)
  }
  @server.logger.debug("Task #{task.id} queued to retry in #{time_at - Time.now.to_f} seconds")
end

#add_task(task) ⇒ Object



18
19
20
21
22
# File 'lib/background_queue/server_lib/error_task_list.rb', line 18

def add_task(task)
  task.increment_error_count
  delay = calculate_delay(task.get_error_count)             
  add_item(task, Time.now.to_i + delay)
end

#calculate_delay(error_count) ⇒ Object



24
25
26
27
# File 'lib/background_queue/server_lib/error_task_list.rb', line 24

def calculate_delay(error_count)
  delay = error_count * error_count
  delay > 120 ? 120 : delay
end

#flushObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/background_queue/server_lib/error_task_list.rb', line 69

def flush
  @server.logger.debug("Flushing #{@tasks.size} tasks from error list")
  @current_runner.cancel if @current_runner
  @current_runner = nil
  @mutex.synchronize {
    while @tasks.size > 0
      next_tasks = @tasks.delete_min
      for task in next_tasks
        @server.task_queue.finish_task(task)
        @server.task_queue.add_task(task)
        @task_count -= 1
      end
    end
  }
end

#next_eventObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/background_queue/server_lib/error_task_list.rb', line 53

def next_event
  @mutex.synchronize {
    @current_runner = nil
    @current_next_at = nil
    while @tasks.size > 0 && @tasks.min_key < (Time.now.to_f + 0.1)
      next_tasks = @tasks.delete_min
      for task in next_tasks
        @server.task_queue.finish_task(task)
        @server.task_queue.add_task(task)
        @task_count -= 1
      end
    end
    queue_next_event(@tasks.min_key) if @tasks.size > 0
  }
end

#queue_next_event(time_at) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/background_queue/server_lib/error_task_list.rb', line 43

def queue_next_event(time_at)
  if @current_next_at.nil? || @current_next_at > time_at
    @current_runner.cancel if @current_runner
    @current_next_at = time_at
    @current_runner = BackgroundQueue::ServerLib::ErrorTaskList::RunAt.new(time_at) {
      self.next_event
    }
  end
end

#wait_for_eventObject



85
86
87
88
# File 'lib/background_queue/server_lib/error_task_list.rb', line 85

def wait_for_event
  runner = @current_runner
  runner.wait_for_run if runner
end