Module: Qwirk::Task

Includes:
Rumx::Bean
Defined in:
lib/qwirk/task.rb

Overview

The following options can be used for configuring the class

:max_pending_records => <integer>
  This is how many records can be queued at a time.
:

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



23
24
25
26
27
# File 'lib/qwirk/task.rb', line 23

def self.included(base)
  #Qwirk::BaseWorker.included(base)
  ::Rumx::Bean.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#finished_publishingObject



95
96
97
98
99
# File 'lib/qwirk/task.rb', line 95

def finished_publishing
  @finished_publishing = true
  @pending_hash_mutex.synchronize { check_finish }
  @reply_thread.join
end

#initialize(publisher, task_id, total_count, opts = {}) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/qwirk/task.rb', line 29

def initialize(publisher, task_id, total_count, opts={})
  @publisher              = publisher
  @pending_hash           = Hash.new
  @pending_hash_mutex     = Mutex.new
  @pending_hash_condition = ConditionVariable.new
  @task_id                = task_id
  @stopped                = false
  @finished_publishing    = false
  @max_pending_records    = opts[:max_pending_records] || 100
  @retry                  = opts[:retry]
  @auto_retry             = opts[:auto_retry]
  @success_count          = 0
  @exception_count        = 0
  @total_count            = total_count
  @exceptions_per_run     = []

  @producer, @consumer   = publisher.create_producer_consumer_pair(self)
  @reply_thread = Thread.new do
    java.lang.Thread.current_thread.name = "Qwirk task: #{task_id}"
    reply_event_loop
    on_done
  end
end

#on_doneObject



63
64
# File 'lib/qwirk/task.rb', line 63

def on_done()
end

#on_exception(request, exception) ⇒ Object



57
58
# File 'lib/qwirk/task.rb', line 57

def on_exception(request, exception)
end

#on_response(request, response) ⇒ Object

Stuff to override



54
55
# File 'lib/qwirk/task.rb', line 54

def on_response(request, response)
end

#on_updateObject



60
61
# File 'lib/qwirk/task.rb', line 60

def on_update()
end

#publish(object) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/qwirk/task.rb', line 73

def publish(object)
  marshaled_object = @publisher.marshaler.marshal(object)
  @pending_hash_mutex.synchronize do
    while !@stopped && @pending_hash.size >= @max_pending_records
      @pending_hash_condition.wait(@pending_hash_mutex)
    end
    unless @stopped
      message_id = @producer.send(marshaled_object)
      @pending_hash[message_id] = object
    end
  end
end

#retry=(val) ⇒ Object



66
67
68
69
70
71
# File 'lib/qwirk/task.rb', line 66

def retry=(val)
  @retry = val
  if val
    @pending_hash_mutex.synchronize { check_retry }
  end
end

#startObject

TODO: Needed?



87
88
# File 'lib/qwirk/task.rb', line 87

def start
end

#stopObject



90
91
92
93
# File 'lib/qwirk/task.rb', line 90

def stop
  @pending_hash_mutex.synchronize { do_stop }
  @reply_thread.join
end