Class: Noda::JobWorker
- Inherits:
-
Object
- Object
- Noda::JobWorker
- Defined in:
- lib/noda/job_worker.rb
Overview
ジョブワーカー
ジョブを待ち受けるスレッドです。 Taskを取りだして実行します.
ip=127.0.0.1
w=Noda::JobWorker.new("#{ip}", "10001")
t = DRb.start_service("druby://#{ip}:10101",w)
w.start
Taskをサーバー経由で送信する
ip=127.0.0.1
server = Noda::JobServer.new ip,"10001"
str = %Q'
class Noda::MyTask
def do_task(table)
table.put @name, "#{Process.pid} : #{Time.now}"
return "#{@name} in #{Process.pid} : #{Time.now}"
end
def initialize(name) @name end
end
'
eval(str)
task = Noda::MyTask.new("test")
server.add_task_class( task.class.to_s, str)
10.times{|i| server.input.push Noda::MyTask.new(i) }
Instance Attribute Summary collapse
-
#max_retry_connect ⇒ Object
Returns the value of attribute max_retry_connect.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#wait_time_to_retry ⇒ Object
Returns the value of attribute wait_time_to_retry.
Instance Method Summary collapse
-
#connect ⇒ Object
サーバーに接続します.
-
#connect_job_server ⇒ Object
内部的に使います。ジョブサーバーへ接続.
-
#handle_task ⇒ Object
担当ジョブからタスクを実行します..
-
#init_thread ⇒ Object
ワーカーのメインスレッドを起動します.start で使います..
-
#initialize(server_addr = "localhost", server_port = "10001", q = "") ⇒ JobWorker
constructor
-
server_addr ジョブサーバーアドレス、またはホスト名 * server_port ジョブサーバーポート.
-
-
#load_class(name) ⇒ Object
クラス定義をEvalする。クラス定義はサーバーから取り出す. ワーカー側にクラス定義を動的に渡すときに使います. *name クラス名.
-
#start ⇒ Object
処理を開始します..
-
#status ⇒ Object
ワーカースレッドの状態を取り出します..
-
#stop ⇒ Object
スレッド停止します.このインスタンスは死にません.start で再起動します..
Constructor Details
#initialize(server_addr = "localhost", server_port = "10001", q = "") ⇒ JobWorker
-
server_addr ジョブサーバーアドレス、またはホスト名
-
server_port ジョブサーバーポート
34 35 36 37 38 39 40 41 42 |
# File 'lib/noda/job_worker.rb', line 34 def initialize( server_addr="localhost",server_port="10001",q="" ) @server_uri = "druby://#{server_addr}:#{server_port}" @max_retry_connect = 30 @wait_time_to_retry = 2 require "socket" @local_addr = IPSocket::getaddress(Socket::gethostname) self.connect self end |
Instance Attribute Details
#max_retry_connect ⇒ Object
Returns the value of attribute max_retry_connect.
31 32 33 |
# File 'lib/noda/job_worker.rb', line 31 def max_retry_connect @max_retry_connect end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
30 31 32 |
# File 'lib/noda/job_worker.rb', line 30 def thread @thread end |
#wait_time_to_retry ⇒ Object
Returns the value of attribute wait_time_to_retry.
31 32 33 |
# File 'lib/noda/job_worker.rb', line 31 def wait_time_to_retry @wait_time_to_retry end |
Instance Method Details
#connect ⇒ Object
サーバーに接続します
92 93 94 |
# File 'lib/noda/job_worker.rb', line 92 def connect self.connect_job_server end |
#connect_job_server ⇒ Object
内部的に使います。ジョブサーバーへ接続
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/noda/job_worker.rb', line 44 def connect_job_server error_conter = 0 begin @job =DRbObject.new_with_uri(@server_uri) @job.hash_table @logger = @job.logger rescue DRb::DRbConnError => e error_conter +=1 raise e if error_conter > @max_retry_connect sleep @wait_time_to_retry retry end end |
#handle_task ⇒ Object
担当ジョブからタスクを実行します.
タスクは do_task(hash)実装が必須 タスクのクラス定義はrequire必須.(start前にrequire) タスクのクラス定義はサーバー側から自動ロード(eval)します.
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/noda/job_worker.rb', line 62 def handle_task() # @logger.info("self.class@#{@local_addr}#{self.object_id}"){"i try to pop a task."} task = @job.input.pop if task.class == DRb::DRbUnknown self.load_class(task.name) task = task.reload end result = task.do_task(@job.hash_table) @job.output.push result end |
#init_thread ⇒ Object
ワーカーのメインスレッドを起動します.start で使います.
80 81 82 83 84 85 86 87 88 |
# File 'lib/noda/job_worker.rb', line 80 def init_thread @table = @job.hash_table @thread= Thread.new{ loop{ self.handle_task() sleep 0.001 } } end |
#load_class(name) ⇒ Object
クラス定義をEvalする。クラス定義はサーバーから取り出す. ワーカー側にクラス定義を動的に渡すときに使います. *name クラス名
75 76 77 78 |
# File 'lib/noda/job_worker.rb', line 75 def load_class(name) s = @job.task_class(name) Noda.module_eval(s) if s end |
#start ⇒ Object
処理を開始します.
threadを返します. worker を起動しっぱなしにするなら thread.joinしてください
99 100 101 102 |
# File 'lib/noda/job_worker.rb', line 99 def start self.init_thread @thread.join end |
#status ⇒ Object
ワーカースレッドの状態を取り出します.
マルチスレッドでブロックされてるとsleep になります
107 108 109 |
# File 'lib/noda/job_worker.rb', line 107 def status @thread.status if @thread end |
#stop ⇒ Object
スレッド停止します.このインスタンスは死にません.start で再起動します.
112 113 114 |
# File 'lib/noda/job_worker.rb', line 112 def stop @thread.kill end |