Class: Gorgon::OriginatorProtocol

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon/originator_protocol.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, cluster_id = nil) ⇒ OriginatorProtocol

Returns a new instance of OriginatorProtocol.



9
10
11
12
13
# File 'lib/gorgon/originator_protocol.rb', line 9

def initialize(logger, cluster_id=nil)
  @originator_exchange_name = OriginatorProtocol.originator_exchange_name(cluster_id)
  @job_exchange_name = OriginatorProtocol.job_exchange_name(cluster_id)
  @logger = logger
end

Class Method Details

.job_exchange_name(cluster_id) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/gorgon/originator_protocol.rb', line 23

def self.job_exchange_name(cluster_id)
  if cluster_id
    "gorgon.jobs.#{cluster_id}"
  else
    'gorgon.jobs'
  end
end

.originator_exchange_name(cluster_id) ⇒ Object



15
16
17
18
19
20
21
# File 'lib/gorgon/originator_protocol.rb', line 15

def self.originator_exchange_name(cluster_id)
  if cluster_id
    "gorgon.originators.#{cluster_id}"
  else
    "gorgon.originators"
  end
end

Instance Method Details

#append_protocol_information_to_job_definition(job_definition) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/gorgon/originator_protocol.rb', line 56

def append_protocol_information_to_job_definition job_definition
  job_definition = job_definition.dup

  job_definition.file_queue_name = @file_queue.name
  job_definition.reply_exchange_name = @reply_exchange.name

  return job_definition
end

#cancel_jobObject



83
84
85
86
87
# File 'lib/gorgon/originator_protocol.rb', line 83

def cancel_job
  @file_queue.purge if @file_queue
  @channel.fanout("gorgon.worker_managers").publish(cancel_message) if @channel
  @logger.log "Cancel Message sent"
end

#connect(connection_information, options = {}) ⇒ Object



31
32
33
34
35
36
# File 'lib/gorgon/originator_protocol.rb', line 31

def connect connection_information, options={}
  @connection = AMQP.connect(connection_information)
  @channel = AMQP::Channel.new(@connection)
  @connection.on_closed { options[:on_closed].call } if options[:on_closed]
  open_queues
end

#disconnectObject



89
90
91
92
# File 'lib/gorgon/originator_protocol.rb', line 89

def disconnect
  cleanup_queues_and_exchange
  @connection.disconnect if @connection
end

#publish_files(files) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/gorgon/originator_protocol.rb', line 38

def publish_files files
  @file_queue = @channel.queue("file_queue_" + UUIDTools::UUID.timestamp_create.to_s, :auto_delete => true)

  files.each do |file|
    @channel.default_exchange.publish(file, :routing_key => @file_queue.name)
  end
end

#publish_job_to_all(job_definition) ⇒ Object



46
47
48
49
# File 'lib/gorgon/originator_protocol.rb', line 46

def publish_job_to_all job_definition
  job_definition = append_protocol_information_to_job_definition(job_definition)
  @channel.fanout(@job_exchange_name).publish(job_definition.to_json)
end

#publish_job_to_one(job_definition, listener_queue_name) ⇒ Object



51
52
53
54
# File 'lib/gorgon/originator_protocol.rb', line 51

def publish_job_to_one job_definition, listener_queue_name
  job_definition = append_protocol_information_to_job_definition(job_definition)
  @channel.default_exchange.publish(job_definition.to_json, :routing_key => listener_queue_name)
end

#receive_new_listener_notificationsObject



77
78
79
80
81
# File 'lib/gorgon/originator_protocol.rb', line 77

def receive_new_listener_notifications
  @originator_queue.subscribe do |payload|
    yield payload
  end
end

#receive_payloadsObject



71
72
73
74
75
# File 'lib/gorgon/originator_protocol.rb', line 71

def receive_payloads
  @reply_queue.subscribe do |payload|
    yield payload
  end
end

#send_message_to_listeners(type, body = {}) ⇒ Object



65
66
67
68
69
# File 'lib/gorgon/originator_protocol.rb', line 65

def send_message_to_listeners type, body={}
  # TODO: we probably want to use a different exchange for this type of messages
  message = {:type => type, :reply_exchange_name => @reply_exchange.name, :body => body}
  @channel.fanout(@job_exchange_name).publish(Yajl::Encoder.encode(message))
end