Class: Fairy::PTask

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-task.rb

Constant Summary collapse

END_OF_STREAM =
:END_OF_STREAM
ST_INIT =
:ST_INIT
ST_ACTIVATE =
:ST_ACTIVATE
ST_FINISH =
:ST_FINISH

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, processor) ⇒ PTask

Returns a new instance of PTask.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/fairy/node/p-task.rb', line 16

def initialize(id, processor)
  @id = id
  Log::info self, "CREATE PTask"

  @processor = processor

  @njobs = []
  @njob_seq = -1
  @njob_seq_mutex = Mutex.new

  @status = ST_INIT
  @status_mon = processor.njob_mon.new_mon
  @status_cv = @status_mon.new_cv

  start_watch_status
end

Instance Attribute Details

#processorObject (readonly)

Returns the value of attribute processor.



33
34
35
# File 'lib/fairy/node/p-task.rb', line 33

def processor
  @processor
end

Instance Method Details

#abort_runningObject



60
61
62
63
64
# File 'lib/fairy/node/p-task.rb', line 60

def abort_running
  @status_mon.synchronize do
	@njobs.last.abort_running unless [ST_INIT, ST_FINISH].include?(@status)
  end
end

#create_njob(njob_class_name, bjob, opts, *rests) ⇒ Object



50
51
52
53
54
55
56
57
# File 'lib/fairy/node/p-task.rb', line 50

def create_njob(njob_class_name, bjob, opts, *rests)
  klass = @processor.import(njob_class_name)
#Log::debugf(self, "KKKKKKKKKKLAS: %s", klass)
  njob = klass.new(njob_next_id, self, bjob, opts, *rests)
  @njobs.push njob
  Log.debugf(self, "Njob number of %d", @njobs.size)
  njob
end

#log_idObject



35
36
37
# File 'lib/fairy/node/p-task.rb', line 35

def log_id
  "PTask[#{@id}]"
end

#njob_next_idObject

njob methods



42
43
44
45
46
47
48
# File 'lib/fairy/node/p-task.rb', line 42

def njob_next_id
  @njob_seq_mutex.synchronize do
	@njob_seq += 1

	format("%02d-%02d", @id, @njob_seq)
  end
end

#notice_status(st) ⇒ Object



98
99
100
101
102
# File 'lib/fairy/node/p-task.rb', line 98

def notice_status(st)
#      @status_mon.entry do
	@processor.update_status(self, st)
#      end
end

#start_watch_statusObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/fairy/node/p-task.rb', line 76

def start_watch_status
  # 初期状態通知
  notice_status(@status)

  @processor.njob_mon.entry do
	@status_mon.synchronize do
	  old_status = nil
	  loop do
 @status_cv.wait_while{old_status == @status}
 old_status = @status
 notice_status(@status)
 break if @status == ST_FINISH
	  end
	end
  end
  nil
end

#status=(val) ⇒ Object

status methods.



69
70
71
72
73
74
# File 'lib/fairy/node/p-task.rb', line 69

def status=(val)
  @status_mon.synchronize do
	@status = val
	@status_cv.broadcast
  end
end

#to_sObject



104
105
106
# File 'lib/fairy/node/p-task.rb', line 104

def to_s
  "#<#{self.class}:##{@id} #{@njobs.collect{|f| f.class.to_s}.join('-').gsub(/Fairy::/, "")}>"
end

#update_status(node, st) ⇒ Object



94
95
96
# File 'lib/fairy/node/p-task.rb', line 94

def update_status(node, st)
  self.status = st
end