Class: Mutual::Messaging
- Inherits:
-
Object
- Object
- Mutual::Messaging
- Extended by:
- Incrementable
- Includes:
- Incrementable
- Defined in:
- lib/mutual.rb,
lib/messaging/task_manager.rb,
lib/messaging/result_manager.rb
Defined Under Namespace
Classes: ResultManager, Task, TaskManager
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#result_manager ⇒ Object
readonly
Returns the value of attribute result_manager.
-
#task_manager ⇒ Object
readonly
Returns the value of attribute task_manager.
Instance Method Summary collapse
- #create_redis_client ⇒ Object
- #generate_id ⇒ Object
-
#initialize(process_name, configuration) ⇒ Messaging
constructor
A new instance of Messaging.
- #multi_task(*args) ⇒ Object
-
#pop_queue(queue) ⇒ Object
WARNING: blocking call.
- #push_queue(queue, object) ⇒ Object
- #send_result(*args) ⇒ Object
- #send_task(options, &block) ⇒ Object
Methods included from Incrementable
Constructor Details
#initialize(process_name, configuration) ⇒ Messaging
Returns a new instance of Messaging.
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/mutual.rb', line 21 def initialize(process_name, configuration) @process_name = process_name @configuration = configuration @id = "#{@process_name}-#{self.class.increment!}" @pop_timeout = 8 @task_manager = TaskManager.new(self) @result_manager = ResultManager.new(self) @client = create_redis_client @clients = {} end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
19 20 21 |
# File 'lib/mutual.rb', line 19 def configuration @configuration end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
19 20 21 |
# File 'lib/mutual.rb', line 19 def id @id end |
#result_manager ⇒ Object (readonly)
Returns the value of attribute result_manager.
19 20 21 |
# File 'lib/mutual.rb', line 19 def result_manager @result_manager end |
#task_manager ⇒ Object (readonly)
Returns the value of attribute task_manager.
19 20 21 |
# File 'lib/mutual.rb', line 19 def task_manager @task_manager end |
Instance Method Details
#create_redis_client ⇒ Object
38 39 40 |
# File 'lib/mutual.rb', line 38 def create_redis_client Redis.new(:host => @configuration[:host], :port => @configuration[:port]) end |
#generate_id ⇒ Object
34 35 36 |
# File 'lib/mutual.rb', line 34 def generate_id "#{@id}-#{increment!}" end |
#multi_task(*args) ⇒ Object
50 51 52 |
# File 'lib/mutual.rb', line 50 def multi_task(*args) raise "unimplemented" end |
#pop_queue(queue) ⇒ Object
WARNING: blocking call.
64 65 66 67 68 69 70 |
# File 'lib/mutual.rb', line 64 def pop_queue(queue) @clients[queue] ||= create_redis_client if result = @clients[queue].brpop(queue, :timeout => @pop_timeout) _channel, = result JSON.parse(, :symbolize_names => true) end end |
#push_queue(queue, object) ⇒ Object
58 59 60 61 |
# File 'lib/mutual.rb', line 58 def push_queue(queue, object) = object.to_json @client.lpush(queue, ) end |
#send_result(*args) ⇒ Object
54 55 56 |
# File 'lib/mutual.rb', line 54 def send_result(*args) raise "unimplemented" end |
#send_task(options, &block) ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/mutual.rb', line 42 def send_task(, &block) task = @task_manager.create() if block @result_manager.listen(task, &block) end @task_manager.send(task) end |