Class: Factor::Runtime::MessageBus

Inherits:
Object
  • Object
show all
Defined in:
lib/runtime/message_bus.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = "queue.factor.io") ⇒ MessageBus

Returns a new instance of MessageBus.



9
10
11
# File 'lib/runtime/message_bus.rb', line 9

def initialize(host="queue.factor.io")
  @host = host
end

Instance Attribute Details

#channelObject

Returns the value of attribute channel.



7
8
9
# File 'lib/runtime/message_bus.rb', line 7

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



7
8
9
# File 'lib/runtime/message_bus.rb', line 7

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



7
8
9
# File 'lib/runtime/message_bus.rb', line 7

def exchange
  @exchange
end

#hostObject

Returns the value of attribute host.



7
8
9
# File 'lib/runtime/message_bus.rb', line 7

def host
  @host
end

#queueObject

Returns the value of attribute queue.



7
8
9
# File 'lib/runtime/message_bus.rb', line 7

def queue
  @queue
end

Instance Method Details

#closeObject



49
50
51
52
# File 'lib/runtime/message_bus.rb', line 49

def close
  @connection.close{ EventMachine.stop }

end

#listen(routing_key = "#", &code) ⇒ Object

creates a new queue to listen to the topic exchange



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/runtime/message_bus.rb', line 26

def listen(routing_key="#",&code)
  queue_name=SecureRandom.hex
  @queue = @channel.queue(queue_name)
  @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange

  @queue.subscribe do |headers,payload|
    message = Message.new
    message.from_queue headers.routing_key, payload
    code.call(message)
  end
end

#send(message) ⇒ Object



38
39
40
# File 'lib/runtime/message_bus.rb', line 38

def send(message)
  @exchange.publish(message.payload,:routing_key => message.route)
end

#send_and_close(message) ⇒ Object



42
43
44
45
46
47
# File 'lib/runtime/message_bus.rb', line 42

def send_and_close(message)
  send(message)

  EM.add_timer(1, Proc.new { close})

end

#start(topic = "workflow", &code) ⇒ Object

Creates the connection and creates a topic exchange An exchange references a place to send messages to the exchange routes it to the queues based on the route_key



16
17
18
19
20
21
22
23
# File 'lib/runtime/message_bus.rb', line 16

def start(topic="workflow",&code)
  EventMachine.run do
    @connection = AMQP.connect(:host=>@host)
    @channel  = AMQP::Channel.new(connection)
    @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange
    code.call
  end
end