Class: BackgroundQueue::ServerLib::BalancedQueue

Inherits:
PriorityQueue show all
Includes:
QueueRegistry
Defined in:
lib/background_queue/server_lib/balanced_queue.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QueueRegistry

#add_item, #finish_item, #next_item, #remove_item

Methods inherited from PriorityQueue

#each_item, #empty?, #has_running_items?, #number_if_items_at_priority, #number_of_priorities, #peek, #pop, #priority, #push, #remove, #stalled=, #stalled?

Constructor Details

#initialize(server) ⇒ BalancedQueue

Returns a new instance of BalancedQueue.



9
10
11
12
13
14
15
16
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 9

def initialize(server)
  @task_registry = BackgroundQueue::ServerLib::TaskRegistry.new
  @condvar = ConditionVariable.new
  @mutex = Mutex.new
  @server = server
  @thread_manager = server.thread_manager
  super()
end

Instance Attribute Details

#serverObject (readonly)

Returns the value of attribute server.



7
8
9
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 7

def server
  @server
end

Class Method Details

.queue_classObject



80
81
82
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 80

def self.queue_class
  BackgroundQueue::ServerLib::Owner
end

Instance Method Details

#add_task(task) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 18

def add_task(task)
  @thread_manager.protect_access {
    if task.replaced_while_waiting_to_retry?
      @server.logger.debug("Not adding task that was replaced while waiting to retry (#{task.id})")
      return
    end
    status, existing_task = @task_registry.register(task)
    if status != :waiting
      if status == :existing
        @server.logger.debug("Removing existing task (#{task.id})")
        remove_item(existing_task)
      elsif status == :waiting_to_retry
        @server.logger.debug("Removing existing task that is waiting to retry (#{task.id})")
        existing_task.set_error_status(:replaced_while_waiting_to_retry)
        finish_item(existing_task)
      end
      add_item(task)
      @thread_manager.signal_access #wake anything reading from the queue
    end
  }
end

#add_task_to_error_list(task) ⇒ Object

need to synchronise this…



61
62
63
64
65
66
67
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 61

def add_task_to_error_list(task)
  @thread_manager.protect_access {
    task.running = false
    task.set_error_status(:waiting_to_retry)
    @server.error_tasks.add_task(task)
  }
end

#finish_task(task) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 46

def finish_task(task)
  @thread_manager.protect_access {
    if task.replaced_while_waiting_to_retry?
      @server.logger.debug("Not finishing task that was replaced while waiting to retry (#{task.id})")
      return
    end
    finish_item(task)
    existing_task = @task_registry.de_register(task.id)
    if existing_task
      add_item(task)
    end
  }
end

#load_from_file(io) ⇒ Object



108
109
110
111
112
113
114
115
116
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 108

def load_from_file(io)
  @server.logger.debug("Loading task queue from file")
  tasks = JSON.parse(io.read, :symbolize_names=>true)
  @server.logger.debug("Adding #{tasks.length} tasks from file")
  for task_data in tasks
    task = Task.new(task_data[:owner_id], task_data[:job_id], task_data[:id], task_data[:priority], task_data[:worker], task_data[:params], task_data[:options])
    add_task(task)
  end
end

#next_taskObject



69
70
71
72
73
74
75
76
77
78
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 69

def next_task
  task = nil
  @thread_manager.control_access {
    task = next_item
    if task.nil?
      @thread_manager.wait_on_access
    end
  }
  task
end

#register_job(job) ⇒ Object



84
85
86
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 84

def register_job(job)
  @server.jobs.register(job)
end

#remove_task(task) ⇒ Object



40
41
42
43
44
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 40

def remove_task(task)
  @thread_manager.protect_access {
    remove_item(task)
  }
end

#save_to_file(io) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 92

def save_to_file(io)
  data = []
  @server.logger.debug("Saving task queue to file")
  @thread_manager.protect_access {
    each_item { |owner|
      owner.each_item { |job|
        job.each_item { |task|
          data << task.to_json_object(true)
        }
      }
    }
  }
  @server.logger.debug("Writing #{data.length} entries to file")
  io.write(JSON.fast_generate(data))
end

#synchronous?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 88

def synchronous?
  false
end