Class: Noda::JobWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/noda/job_worker.rb

Overview

ジョブワーカー

ジョブを待ち受けるスレッドです。 Taskを取りだして実行します.

ip=127.0.0.1
w=Noda::JobWorker.new("#{ip}", "10001")
t = DRb.start_service("druby://#{ip}:10101",w)
w.start

Taskをサーバー経由で送信する

ip=127.0.0.1
server = Noda::JobServer.new ip,"10001"
str = %Q'
       class Noda::MyTask
         def do_task(table)
            table.put @name, "#{Process.pid} : #{Time.now}"
            return "#{@name} in #{Process.pid} : #{Time.now}"
         end
         def initialize(name) @name end
       end
'
eval(str)
task = Noda::MyTask.new("test")
server.add_task_class( task.class.to_s, str)
10.times{|i| server.input.push Noda::MyTask.new(i) }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server_addr = "localhost", server_port = "10001", q = "") ⇒ JobWorker

  • server_addr ジョブサーバーアドレス、またはホスト名

  • server_port ジョブサーバーポート



34
35
36
37
38
39
40
41
42
# File 'lib/noda/job_worker.rb', line 34

def initialize( server_addr="localhost",server_port="10001",q="" )
  @server_uri  = "druby://#{server_addr}:#{server_port}"
  @max_retry_connect  = 30
  @wait_time_to_retry =  2
  require "socket" 
  @local_addr = IPSocket::getaddress(Socket::gethostname)
  self.connect
  self
end

Instance Attribute Details

#max_retry_connectObject

Returns the value of attribute max_retry_connect.



31
32
33
# File 'lib/noda/job_worker.rb', line 31

def max_retry_connect
  @max_retry_connect
end

#threadObject (readonly)

Returns the value of attribute thread.



30
31
32
# File 'lib/noda/job_worker.rb', line 30

def thread
  @thread
end

#wait_time_to_retryObject

Returns the value of attribute wait_time_to_retry.



31
32
33
# File 'lib/noda/job_worker.rb', line 31

def wait_time_to_retry
  @wait_time_to_retry
end

Instance Method Details

#connectObject

サーバーに接続します



92
93
94
# File 'lib/noda/job_worker.rb', line 92

def connect
  self.connect_job_server
end

#connect_job_serverObject

内部的に使います。ジョブサーバーへ接続



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/noda/job_worker.rb', line 44

def connect_job_server
  error_conter = 0
  begin 
    @job =DRbObject.new_with_uri(@server_uri)
    @job.hash_table
    @logger = @job.logger
  rescue DRb::DRbConnError => e
    error_conter +=1
    raise e if error_conter > @max_retry_connect
    sleep @wait_time_to_retry 
    retry
  end
end

#handle_taskObject

担当ジョブからタスクを実行します.

タスクは do_task(hash)実装が必須 タスクのクラス定義はrequire必須.(start前にrequire) タスクのクラス定義はサーバー側から自動ロード(eval)します.



62
63
64
65
66
67
68
69
70
71
# File 'lib/noda/job_worker.rb', line 62

def handle_task()
  # @logger.info("self.class@#{@local_addr}#{self.object_id}"){"i try to pop a task."} 
  task = @job.input.pop
  if task.class ==  DRb::DRbUnknown
    self.load_class(task.name)
    task = task.reload
  end
  result = task.do_task(@job.hash_table)
  @job.output.push result
end

#init_threadObject

ワーカーのメインスレッドを起動します.start で使います.



80
81
82
83
84
85
86
87
88
# File 'lib/noda/job_worker.rb', line 80

def init_thread
  @table = @job.hash_table
  @thread= Thread.new{
      loop{
        self.handle_task()
        sleep 0.001
      }
  }
end

#load_class(name) ⇒ Object

クラス定義をEvalする。クラス定義はサーバーから取り出す. ワーカー側にクラス定義を動的に渡すときに使います. *name クラス名



75
76
77
78
# File 'lib/noda/job_worker.rb', line 75

def load_class(name)
  s = @job.task_class(name)
  Noda.module_eval(s) if s
end

#startObject

処理を開始します.

threadを返します. worker を起動しっぱなしにするなら thread.joinしてください



99
100
101
102
# File 'lib/noda/job_worker.rb', line 99

def start
  self.init_thread
  @thread.join
end

#statusObject

ワーカースレッドの状態を取り出します.

マルチスレッドでブロックされてるとsleep になります



107
108
109
# File 'lib/noda/job_worker.rb', line 107

def status
  @thread.status if @thread
end

#stopObject

スレッド停止します.このインスタンスは死にません.start で再起動します.



112
113
114
# File 'lib/noda/job_worker.rb', line 112

def stop
  @thread.kill
end