Class: Isono::NodeModules::JobChannel
- Defined in:
- lib/isono/node_modules/job_channel.rb
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #cancel(job_id) ⇒ Object
- #register_endpoint(endpoint, app, opts = {}) ⇒ Object
-
#run(endpoint, command, *args) {|modify| ... } ⇒ Object
Send a new job request and wait until the job finished.
-
#submit(endpoint, command, *args) {|modify| ... } ⇒ Rack::Request, String
Send a new job request to the endpoint.
Methods inherited from Base
#config_section, #initialize, #manifest, #value_object
Constructor Details
This class inherits a constructor from Isono::NodeModules::Base
Instance Method Details
#cancel(job_id) ⇒ Object
73 74 |
# File 'lib/isono/node_modules/job_channel.rb', line 73 def cancel(job_id) end |
#register_endpoint(endpoint, app, opts = {}) ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/isono/node_modules/job_channel.rb', line 76 def register_endpoint(endpoint, app, opts={}) raise ArgumentError unless endpoint.is_a?(String) raise ArgumentError unless app.respond_to?(:call) opts = { :concurrency=>config_section.concurrency, :thread_pool => nil, }.merge(opts) rpc.register_endpoint("job.#{endpoint}", Rack::Job.new(app, JobWorker.new(@node, opts[:thread_pool])), {:prefetch=>opts[:concurrency]}) end |
#run(endpoint, command, *args) {|modify| ... } ⇒ Object
Send a new job request and wait until the job finished. The difference to submit() is that this method will stop the thread until the called job completed.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/isono/node_modules/job_channel.rb', line 51 def run(endpoint, command, *args, &blk) cur_job_ctx = Thread.current[JobWorker::JOB_CTX_KEY] req = rpc.request("job.#{endpoint}", command, *args) { |req| req.timeout_sec = config_section.timeout_sec req.request[:job_id] = Util.gen_id req.request[:session_id] = req.request[:job_id] req.request[:job_name] = command # A job is working on this current thread if cur_job_ctx is # not nil. Let the new job know the current job ID # as its parent job ID. if cur_job_ctx req.request[:parent_job_id] = cur_job_ctx.job_id req.request[:session_id] = cur_job_ctx.session_id end blk ? blk.call(req) : req.synchronize } blk ? req : req.wait end |
#submit(endpoint, command, *args) {|modify| ... } ⇒ Rack::Request, String
Send a new job request to the endpoint.
25 26 27 28 29 30 31 |
# File 'lib/isono/node_modules/job_channel.rb', line 25 def submit(endpoint, command, *args, &blk) req = run(endpoint, command, *args) { |req| blk.call(req) if blk } req.request[:job_id] end |