Class: Mutual::Messaging::ResultManager

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/result_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(messaging, configuration = {}) ⇒ ResultManager

Returns a new instance of ResultManager.



7
8
9
10
11
12
13
14
15
# File 'lib/messaging/result_manager.rb', line 7

def initialize(messaging, configuration={})
  @messaging = messaging
  @timeout = configuration[:timeout] || 8 # seconds

  # will be used as a blocking client
  @client = @messaging.create_redis_client
  @timeouts = []
  @tasks = {}
end

Instance Attribute Details

#tasksObject (readonly)

Returns the value of attribute tasks.



6
7
8
# File 'lib/messaging/result_manager.rb', line 6

def tasks
  @tasks
end

Instance Method Details

#handle_timeoutsObject



34
35
36
37
38
39
40
41
42
# File 'lib/messaging/result_manager.rb', line 34

def handle_timeouts
  now = Time.now.to_i
  while @timeouts[0] && @timeouts[0][0] > now
    expire, task_id = @timeouts.shift
    if listener = @tasks.delete(task_id)
      listener.call :timeout => true
    end
  end
end

#listen(task, options = {}, &block) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/messaging/result_manager.rb', line 22

def listen(task, options={}, &block)
  now = Time.now
  ttl = options[:timeout] || 8
  expire = (now + ttl).to_i
  @timeouts << [expire, task.task_id]
  @tasks[task.task_id] = block
end

#match(result) ⇒ Object



30
31
32
# File 'lib/messaging/result_manager.rb', line 30

def match(result)
  @tasks.delete(result[:task_id])
end

#read(&block) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/messaging/result_manager.rb', line 62

def read(&block)
  @thread = Thread.new do
    thread = Thread.current
    thread[:run] = true
    while thread[:run]
      if result = @messaging.pop_queue(@messaging.id)
        yield(result)
      end
    end
  end
end

#runObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/messaging/result_manager.rb', line 44

def run
  Thread.new do
    thread = Thread.current
    thread[:run] = true
    while thread[:run]
      sleep 0.5
      handle_timeouts
    end
  end
  read do |result|
    if listener = match(result)
      listener.call(result)
    else
      #print "\ngot result no one was expecting: #{result.inspect}"
    end
  end
end

#waitObject

BLOCKING!



18
19
20
# File 'lib/messaging/result_manager.rb', line 18

def wait
  @messaging.pop_queue(@messaging.id)
end