Class: Mutual::Messaging::ResultManager
- Inherits:
-
Object
- Object
- Mutual::Messaging::ResultManager
- Defined in:
- lib/messaging/result_manager.rb
Instance Attribute Summary collapse
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
Instance Method Summary collapse
- #handle_timeouts ⇒ Object
-
#initialize(messaging, configuration = {}) ⇒ ResultManager
constructor
A new instance of ResultManager.
- #listen(task, options = {}, &block) ⇒ Object
- #match(result) ⇒ Object
- #read(&block) ⇒ Object
- #run ⇒ Object
-
#wait ⇒ Object
BLOCKING!.
Constructor Details
#initialize(messaging, configuration = {}) ⇒ ResultManager
Returns a new instance of ResultManager.
7 8 9 10 11 12 13 14 15 |
# File 'lib/messaging/result_manager.rb', line 7 def initialize(messaging, configuration={}) @messaging = messaging @timeout = configuration[:timeout] || 8 # seconds # will be used as a blocking client @client = @messaging.create_redis_client @timeouts = [] @tasks = {} end |
Instance Attribute Details
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
6 7 8 |
# File 'lib/messaging/result_manager.rb', line 6 def tasks @tasks end |
Instance Method Details
#handle_timeouts ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/messaging/result_manager.rb', line 34 def handle_timeouts now = Time.now.to_i while @timeouts[0] && @timeouts[0][0] > now expire, task_id = @timeouts.shift if listener = @tasks.delete(task_id) listener.call :timeout => true end end end |
#listen(task, options = {}, &block) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/messaging/result_manager.rb', line 22 def listen(task, ={}, &block) now = Time.now ttl = [:timeout] || 8 expire = (now + ttl).to_i @timeouts << [expire, task.task_id] @tasks[task.task_id] = block end |
#match(result) ⇒ Object
30 31 32 |
# File 'lib/messaging/result_manager.rb', line 30 def match(result) @tasks.delete(result[:task_id]) end |
#read(&block) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/messaging/result_manager.rb', line 62 def read(&block) @thread = Thread.new do thread = Thread.current thread[:run] = true while thread[:run] if result = @messaging.pop_queue(@messaging.id) yield(result) end end end end |
#run ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/messaging/result_manager.rb', line 44 def run Thread.new do thread = Thread.current thread[:run] = true while thread[:run] sleep 0.5 handle_timeouts end end read do |result| if listener = match(result) listener.call(result) else #print "\ngot result no one was expecting: #{result.inspect}" end end end |
#wait ⇒ Object
BLOCKING!
18 19 20 |
# File 'lib/messaging/result_manager.rb', line 18 def wait @messaging.pop_queue(@messaging.id) end |