Class: BackgroundQueue::ServerLib::ErrorTaskList
- Inherits:
-
Object
- Object
- BackgroundQueue::ServerLib::ErrorTaskList
- Defined in:
- lib/background_queue/server_lib/error_task_list.rb
Defined Under Namespace
Classes: RunAt
Instance Attribute Summary collapse
-
#task_count ⇒ Object
readonly
Returns the value of attribute task_count.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
Instance Method Summary collapse
- #add_item(task, time_at) ⇒ Object
- #add_task(task) ⇒ Object
- #calculate_delay(error_count) ⇒ Object
- #flush ⇒ Object
-
#initialize(server) ⇒ ErrorTaskList
constructor
A new instance of ErrorTaskList.
- #next_event ⇒ Object
- #queue_next_event(time_at) ⇒ Object
- #wait_for_event ⇒ Object
Constructor Details
#initialize(server) ⇒ ErrorTaskList
Returns a new instance of ErrorTaskList.
9 10 11 12 13 14 15 16 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 9 def initialize(server) @server = server @tasks = Containers::RBTreeMap.new @mutex = Mutex.new @current_next_at = nil @current_runner = nil @task_count = 0 end |
Instance Attribute Details
#task_count ⇒ Object (readonly)
Returns the value of attribute task_count.
7 8 9 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 7 def task_count @task_count end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
6 7 8 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 6 def tasks @tasks end |
Instance Method Details
#add_item(task, time_at) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 29 def add_item(task, time_at) @mutex.synchronize { existing = @tasks[time_at] if existing.nil? existing = [] @tasks[time_at] = existing end existing << task @task_count += 1 queue_next_event(time_at) } @server.logger.debug("Task #{task.id} queued to retry in #{time_at - Time.now.to_f} seconds") end |
#add_task(task) ⇒ Object
18 19 20 21 22 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 18 def add_task(task) task.increment_error_count delay = calculate_delay(task.get_error_count) add_item(task, Time.now.to_i + delay) end |
#calculate_delay(error_count) ⇒ Object
24 25 26 27 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 24 def calculate_delay(error_count) delay = error_count * error_count delay > 120 ? 120 : delay end |
#flush ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 69 def flush @server.logger.debug("Flushing #{@tasks.size} tasks from error list") @current_runner.cancel if @current_runner @current_runner = nil @mutex.synchronize { while @tasks.size > 0 next_tasks = @tasks.delete_min for task in next_tasks @server.task_queue.finish_task(task) @server.task_queue.add_task(task) @task_count -= 1 end end } end |
#next_event ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 53 def next_event @mutex.synchronize { @current_runner = nil @current_next_at = nil while @tasks.size > 0 && @tasks.min_key < (Time.now.to_f + 0.1) next_tasks = @tasks.delete_min for task in next_tasks @server.task_queue.finish_task(task) @server.task_queue.add_task(task) @task_count -= 1 end end queue_next_event(@tasks.min_key) if @tasks.size > 0 } end |
#queue_next_event(time_at) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 43 def queue_next_event(time_at) if @current_next_at.nil? || @current_next_at > time_at @current_runner.cancel if @current_runner @current_next_at = time_at @current_runner = BackgroundQueue::ServerLib::ErrorTaskList::RunAt.new(time_at) { self.next_event } end end |
#wait_for_event ⇒ Object
85 86 87 88 |
# File 'lib/background_queue/server_lib/error_task_list.rb', line 85 def wait_for_event runner = @current_runner runner.wait_for_run if runner end |