Class: OriginatorProtocol

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon/originator_protocol.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger) ⇒ OriginatorProtocol

Returns a new instance of OriginatorProtocol.



8
9
10
# File 'lib/gorgon/originator_protocol.rb', line 8

def initialize logger
  @logger = logger
end

Instance Method Details

#cancel_jobObject



46
47
48
49
50
# File 'lib/gorgon/originator_protocol.rb', line 46

def cancel_job
  @file_queue.purge if @file_queue
  @channel.fanout("gorgon.worker_managers").publish(cancel_message) if @channel
  @logger.log "Cancel Message sent"
end

#connect(connection_information, options = {}) ⇒ Object



12
13
14
15
16
17
# File 'lib/gorgon/originator_protocol.rb', line 12

def connect connection_information, options={}
  @connection = AMQP.connect(connection_information)
  @channel = AMQP::Channel.new(@connection)
  @connection.on_closed { options[:on_closed].call } if options[:on_closed]
  open_queues
end

#disconnectObject



52
53
54
55
# File 'lib/gorgon/originator_protocol.rb', line 52

def disconnect
  cleanup_queues_and_exchange
  @connection.disconnect if @connection
end

#publish_files(files) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/gorgon/originator_protocol.rb', line 19

def publish_files files
  @file_queue = @channel.queue("file_queue_" + UUIDTools::UUID.timestamp_create.to_s, :auto_delete => true)

  files.each do |file|
    @channel.default_exchange.publish(file, :routing_key => @file_queue.name)
  end
end

#publish_job(job_definition) ⇒ Object



27
28
29
30
31
32
# File 'lib/gorgon/originator_protocol.rb', line 27

def publish_job job_definition
  job_definition.file_queue_name = @file_queue.name
  job_definition.reply_exchange_name = @reply_exchange.name

  @channel.fanout("gorgon.jobs").publish(job_definition.to_json)
end

#receive_payloadsObject



40
41
42
43
44
# File 'lib/gorgon/originator_protocol.rb', line 40

def receive_payloads
  @reply_queue.subscribe do |payload|
    yield payload
  end
end

#send_message_to_listeners(type, body = {}) ⇒ Object



34
35
36
37
38
# File 'lib/gorgon/originator_protocol.rb', line 34

def send_message_to_listeners type, body={}
  # TODO: we probably want to use a different exchange for this type of messages
  message = {:type => type, :reply_exchange_name => @reply_exchange.name, :body => body}
  @channel.fanout("gorgon.jobs").publish(Yajl::Encoder.encode(message))
end