Module: Qwirk::Task
- Includes:
- Rumx::Bean
- Defined in:
- lib/qwirk/task.rb
Overview
The following options can be used for configuring the class
:max_pending_records => <integer>
This is how many records can be queued at a time.
:
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
Instance Method Summary collapse
- #finished_publishing ⇒ Object
- #initialize(publisher, task_id, total_count, opts = {}) ⇒ Object
- #on_done ⇒ Object
- #on_exception(request, exception) ⇒ Object
-
#on_response(request, response) ⇒ Object
Stuff to override.
- #on_update ⇒ Object
- #publish(object) ⇒ Object
- #retry=(val) ⇒ Object
-
#start ⇒ Object
TODO: Needed?.
- #stop ⇒ Object
Class Method Details
.included(base) ⇒ Object
23 24 25 26 27 |
# File 'lib/qwirk/task.rb', line 23 def self.included(base) #Qwirk::BaseWorker.included(base) ::Rumx::Bean.included(base) base.extend(ClassMethods) end |
Instance Method Details
#finished_publishing ⇒ Object
95 96 97 98 99 |
# File 'lib/qwirk/task.rb', line 95 def finished_publishing @finished_publishing = true @pending_hash_mutex.synchronize { check_finish } @reply_thread.join end |
#initialize(publisher, task_id, total_count, opts = {}) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/qwirk/task.rb', line 29 def initialize(publisher, task_id, total_count, opts={}) @publisher = publisher @pending_hash = Hash.new @pending_hash_mutex = Mutex.new @pending_hash_condition = ConditionVariable.new @task_id = task_id @stopped = false @finished_publishing = false @max_pending_records = opts[:max_pending_records] || 100 @retry = opts[:retry] @auto_retry = opts[:auto_retry] @success_count = 0 @exception_count = 0 @total_count = total_count @exceptions_per_run = [] @producer, @consumer = publisher.create_producer_consumer_pair(self) @reply_thread = Thread.new do java.lang.Thread.current_thread.name = "Qwirk task: #{task_id}" reply_event_loop on_done end end |
#on_done ⇒ Object
63 64 |
# File 'lib/qwirk/task.rb', line 63 def on_done() end |
#on_exception(request, exception) ⇒ Object
57 58 |
# File 'lib/qwirk/task.rb', line 57 def on_exception(request, exception) end |
#on_response(request, response) ⇒ Object
Stuff to override
54 55 |
# File 'lib/qwirk/task.rb', line 54 def on_response(request, response) end |
#on_update ⇒ Object
60 61 |
# File 'lib/qwirk/task.rb', line 60 def on_update() end |
#publish(object) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/qwirk/task.rb', line 73 def publish(object) marshaled_object = @publisher.marshaler.marshal(object) @pending_hash_mutex.synchronize do while !@stopped && @pending_hash.size >= @max_pending_records @pending_hash_condition.wait(@pending_hash_mutex) end unless @stopped = @producer.send(marshaled_object) @pending_hash[] = object end end end |
#retry=(val) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/qwirk/task.rb', line 66 def retry=(val) @retry = val if val @pending_hash_mutex.synchronize { check_retry } end end |
#start ⇒ Object
TODO: Needed?
87 88 |
# File 'lib/qwirk/task.rb', line 87 def start end |
#stop ⇒ Object
90 91 92 93 |
# File 'lib/qwirk/task.rb', line 90 def stop @pending_hash_mutex.synchronize { do_stop } @reply_thread.join end |