Class: BackgroundQueue::ServerLib::BalancedQueue
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#add_item, #finish_item, #next_item, #remove_item
#each_item, #empty?, #has_running_items?, #number_if_items_at_priority, #number_of_priorities, #peek, #pop, #priority, #push, #remove, #stalled=, #stalled?
Constructor Details
Returns a new instance of BalancedQueue.
9
10
11
12
13
14
15
16
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 9
def initialize(server)
@task_registry = BackgroundQueue::ServerLib::TaskRegistry.new
@condvar = ConditionVariable.new
@mutex = Mutex.new
@server = server
@thread_manager = server.thread_manager
super()
end
|
Instance Attribute Details
#server ⇒ Object
Returns the value of attribute server.
7
8
9
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 7
def server
@server
end
|
Instance Method Details
#add_task(task) ⇒ Object
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 18
def add_task(task)
@thread_manager.protect_access {
if task.replaced_while_waiting_to_retry?
@server.logger.debug("Not adding task that was replaced while waiting to retry (#{task.id})")
return
end
status, existing_task = @task_registry.register(task)
if status != :waiting
if status == :existing
@server.logger.debug("Removing existing task (#{task.id})")
remove_item(existing_task)
elsif status == :waiting_to_retry
@server.logger.debug("Removing existing task that is waiting to retry (#{task.id})")
existing_task.set_error_status(:replaced_while_waiting_to_retry)
finish_item(existing_task)
end
add_item(task)
@thread_manager.signal_access end
}
end
|
#add_task_to_error_list(task) ⇒ Object
need to synchronise this…
61
62
63
64
65
66
67
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 61
def add_task_to_error_list(task)
@thread_manager.protect_access {
task.running = false
task.set_error_status(:waiting_to_retry)
@server.error_tasks.add_task(task)
}
end
|
#finish_task(task) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 46
def finish_task(task)
@thread_manager.protect_access {
if task.replaced_while_waiting_to_retry?
@server.logger.debug("Not finishing task that was replaced while waiting to retry (#{task.id})")
return
end
finish_item(task)
existing_task = @task_registry.de_register(task.id)
if existing_task
add_item(task)
end
}
end
|
#load_from_file(io) ⇒ Object
108
109
110
111
112
113
114
115
116
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 108
def load_from_file(io)
@server.logger.debug("Loading task queue from file")
tasks = JSON.parse(io.read, :symbolize_names=>true)
@server.logger.debug("Adding #{tasks.length} tasks from file")
for task_data in tasks
task = Task.new(task_data[:owner_id], task_data[:job_id], task_data[:id], task_data[:priority], task_data[:worker], task_data[:params], task_data[:options])
add_task(task)
end
end
|
#next_task ⇒ Object
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 69
def next_task
task = nil
@thread_manager.control_access {
task = next_item
if task.nil?
@thread_manager.wait_on_access
end
}
task
end
|
#register_job(job) ⇒ Object
84
85
86
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 84
def register_job(job)
@server.jobs.register(job)
end
|
#remove_task(task) ⇒ Object
40
41
42
43
44
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 40
def remove_task(task)
@thread_manager.protect_access {
remove_item(task)
}
end
|
#save_to_file(io) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 92
def save_to_file(io)
data = []
@server.logger.debug("Saving task queue to file")
@thread_manager.protect_access {
each_item { |owner|
owner.each_item { |job|
job.each_item { |task|
data << task.to_json_object(true)
}
}
}
}
@server.logger.debug("Writing #{data.length} entries to file")
io.write(JSON.fast_generate(data))
end
|
#synchronous? ⇒ Boolean
88
89
90
|
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 88
def synchronous?
false
end
|