Class: DRbQS::Server::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/server/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new) ⇒ Queue

Returns a new instance of Queue.



9
10
11
12
13
14
15
16
17
# File 'lib/drbqs/server/queue.rb', line 9

def initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new)
  @queue = queue
  @result = result
  @task_id = 0
  @cache = {}
  @calculating = Hash.new { |hash, key| hash[key] = Array.new }
  @history = DRbQS::Server::TaskHistory.new
  @logger = logger
end

Instance Attribute Details

#calculatingObject (readonly)

Returns the value of attribute calculating.



7
8
9
# File 'lib/drbqs/server/queue.rb', line 7

def calculating
  @calculating
end

#historyObject (readonly)

Returns the value of attribute history.



7
8
9
# File 'lib/drbqs/server/queue.rb', line 7

def history
  @history
end

Instance Method Details

#add(task) ⇒ Fixnum

Returns task ID (for debug).

Returns:

  • (Fixnum)

    task ID (for debug)



25
26
27
28
29
30
31
32
# File 'lib/drbqs/server/queue.rb', line 25

def add(task)
  @task_id += 1
  @logger.info("New task: #{@task_id}")
  @cache[@task_id] = task
  queue_task(@task_id)
  @history.set(@task_id, :add, task.note)
  @task_id
end

#all_logsObject



141
142
143
# File 'lib/drbqs/server/queue.rb', line 141

def all_logs
  @history.log_strings
end

#calculating_nodesObject



145
146
147
148
149
150
151
152
153
# File 'lib/drbqs/server/queue.rb', line 145

def calculating_nodes
  nodes = []
  @calculating.each do |node_id, tasks|
    if tasks.size > 0
      nodes << node_id
    end
  end
  nodes.sort!
end

#calculating_task_messageObject

Return a hash of which keys are node ID number and values are an array of pairs of task ID number and its message.



105
106
107
108
109
110
111
112
113
# File 'lib/drbqs/server/queue.rb', line 105

def calculating_task_message
  mes = {}
  @calculating.each do |node_id, task_id_ary|
    mes[node_id] = task_id_ary.map do |n|
      [n, @history.events(n)[0][2]]
    end
  end
  mes
end

#calculating_task_numberObject



115
116
117
# File 'lib/drbqs/server/queue.rb', line 115

def calculating_task_number
  @calculating.inject(0) { |s, key_val| s + key_val[1].size }
end

#empty?Boolean

If queue is empty, that is, there is no tasks to calculate next, this method returns true. Otherwise, false. Even if there are calculating tasks, the method can return true.

Returns:

  • (Boolean)


131
132
133
# File 'lib/drbqs/server/queue.rb', line 131

def empty?
  stocked_task_number == 0
end

#exec_task_hook(main_server, task_id, result) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/drbqs/server/queue.rb', line 73

def exec_task_hook(main_server, task_id, result)
  if task = @cache.delete(task_id)
    if task.exec_hook(main_server, result)
      @history.set(task_id, :hook)
    end
    true
  else
    @logger.error("Task #{task_id} is not cached.")
    false
  end
end

#finished?Boolean

If there are no tasks in queue and calculating, return true. Otherwise, false.

Returns:

  • (Boolean)


137
138
139
# File 'lib/drbqs/server/queue.rb', line 137

def finished?
  @cache.size == 0
end

#finished_task_numberObject



123
124
125
# File 'lib/drbqs/server/queue.rb', line 123

def finished_task_number
  @history.finished_task_number
end

#get_accept_signalObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/drbqs/server/queue.rb', line 34

def get_accept_signal
  count = 0
  begin
    loop do
      sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0)
      count += 1
      @calculating[node_id] << task_id
      @history.set(task_id, :calculate, node_id)
      @logger.info("Accept: task #{task_id} by node #{node_id}.")
    end
  rescue Rinda::RequestExpiredError
    @logger.debug("Accept: #{count} signals.")
  end
  count
end

#get_result(main_server) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/drbqs/server/queue.rb', line 85

def get_result(main_server)
  count = 0
  begin
    loop do
      get_accept_signal
      sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0)
      count += 1
      @history.set(task_id, :result, node_id)
      @logger.info("Get: result of #{task_id} from node #{node_id}.")
      delete_task_id(node_id, task_id)
      exec_task_hook(main_server, task_id, result)
    end
  rescue Rinda::RequestExpiredError
    @logger.debug("Get: #{count} results.")
  end
  count
end

#requeue_for_deleted_node_id(deleted) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/drbqs/server/queue.rb', line 50

def requeue_for_deleted_node_id(deleted)
  deleted.each do |node_id|
    if task_id_ary = @calculating[node_id]
      task_id_ary.each do |task_id|
        queue_task(task_id)
        @history.set(task_id, :requeue)
        @logger.info("Requeue: task #{task_id}.")
      end
      @calculating.delete(node_id)
    end
  end
end

#stocked_task_numberObject



119
120
121
# File 'lib/drbqs/server/queue.rb', line 119

def stocked_task_number
  @cache.size - calculating_task_number
end