Class: Mutual::Messaging

Inherits:
Object
  • Object
show all
Extended by:
Incrementable
Includes:
Incrementable
Defined in:
lib/mutual.rb,
lib/messaging/task_manager.rb,
lib/messaging/result_manager.rb

Defined Under Namespace

Classes: ResultManager, Task, TaskManager

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Incrementable

increment!

Constructor Details

#initialize(process_name, configuration) ⇒ Messaging

Returns a new instance of Messaging.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/mutual.rb', line 21

def initialize(process_name, configuration)
  @process_name = process_name
  @configuration = configuration
  @id = "#{@process_name}-#{self.class.increment!}"

  @pop_timeout = 8
  @task_manager = TaskManager.new(self)
  @result_manager = ResultManager.new(self)
  @client = create_redis_client
  @clients = {}

end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



19
20
21
# File 'lib/mutual.rb', line 19

def configuration
  @configuration
end

#idObject (readonly)

Returns the value of attribute id.



19
20
21
# File 'lib/mutual.rb', line 19

def id
  @id
end

#result_managerObject (readonly)

Returns the value of attribute result_manager.



19
20
21
# File 'lib/mutual.rb', line 19

def result_manager
  @result_manager
end

#task_managerObject (readonly)

Returns the value of attribute task_manager.



19
20
21
# File 'lib/mutual.rb', line 19

def task_manager
  @task_manager
end

Instance Method Details

#create_redis_clientObject



38
39
40
# File 'lib/mutual.rb', line 38

def create_redis_client
  Redis.new(:host => @configuration[:host], :port => @configuration[:port])
end

#generate_idObject



34
35
36
# File 'lib/mutual.rb', line 34

def generate_id
  "#{@id}-#{increment!}"
end

#multi_task(*args) ⇒ Object



50
51
52
# File 'lib/mutual.rb', line 50

def multi_task(*args)
  raise "unimplemented"
end

#pop_queue(queue) ⇒ Object

WARNING: blocking call.



64
65
66
67
68
69
70
# File 'lib/mutual.rb', line 64

def pop_queue(queue)
  @clients[queue] ||= create_redis_client
  if result = @clients[queue].brpop(queue, :timeout => @pop_timeout)
    _channel, message = result
    JSON.parse(message, :symbolize_names => true)
  end
end

#push_queue(queue, object) ⇒ Object



58
59
60
61
# File 'lib/mutual.rb', line 58

def push_queue(queue, object)
  message = object.to_json
  @client.lpush(queue, message)
end

#send_result(*args) ⇒ Object



54
55
56
# File 'lib/mutual.rb', line 54

def send_result(*args)
  raise "unimplemented"
end

#send_task(options, &block) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/mutual.rb', line 42

def send_task(options, &block)
  task = @task_manager.create(options)
  if block
    @result_manager.listen(task, &block)
  end
  @task_manager.send(task)
end