Class: OmfCommon::Comm::AMQP::Communicator

Inherits:
OmfCommon::Comm show all
Defined in:
lib/omf_common/comm/amqp/amqp_communicator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from OmfCommon::Comm

init, instance, #local_address, #local_topic, #on_interrupted, #options, #subscribe

Instance Attribute Details

#channelObject (readonly)

end



30
31
32
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 30

def channel
  @channel
end

Instance Method Details

#broadcast_file(file_path, topic_name = nil, opts = {}, &block) ⇒ Object



114
115
116
117
118
119
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 114

def broadcast_file(file_path, topic_name = nil, opts = {}, &block)
  topic_name ||= SecureRandom.uuid
  require 'omf_common/comm/amqp/amqp_file_transfer'
  OmfCommon::Comm::AMQP::FileBroadcaster.new(file_path, @channel, topic_name, opts, &block)
  "bdcst:#{@address_prefix + topic_name}"
end

#conn_infoObject



51
52
53
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 51

def conn_info
  { proto: :amqp, user: ::AMQP.settings[:user], domain: ::AMQP.settings[:host] }
end

#create_topic(topic, opts = {}) ⇒ Object

Create a new pubsub topic with additional configuration

Parameters:

  • topic (String)

    Pubsub topic name



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 84

def create_topic(topic, opts = {})
  raise "Topic can't be nil or empty" if topic.nil? || topic.to_s.empty?
  opts = opts.dup
  opts[:communicator] = self
  topic = topic.to_s
  if topic.start_with? 'amqp:'
    # absolute address
    unless topic.start_with? @address_prefix
      raise "Cannot subscribe to a topic from different domain (#{topic}) - #{@address_prefix}"
    end
    opts[:address] = topic
    topic = topic.split(@address_prefix).last
  else
    opts[:address] = @address_prefix + topic
  end
  OmfCommon::Comm::AMQP::Topic.create(topic, opts)
end

#delete_topic(topic, &block) ⇒ Object

Delete a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name



105
106
107
108
109
110
111
112
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 105

def delete_topic(topic, &block)
  # FIXME CommProvider?
  if t = OmfCommon::CommProvider::AMQP::Topic.find(topic)
    t.release
  else
    warn "Attempt to delete unknown topic '#{topic}"
  end
end

#disconnect(opts = {}) ⇒ Object

Shut down comms layer



60
61
62
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 60

def disconnect(opts = {})
  info "Disconnecting..."
end

#init(opts = {}) ⇒ Object

Initialize comms layer



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 34

def init(opts = {})
  @opts = {
    #:ssl (Hash) TLS (SSL) parameters to use.
    heartbeat: 20, # (Fixnum) - default: 0 Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0.
    #:on_tcp_connection_failure (#call) - A callable object that will be run if connection to server fails
    #:on_possible_authentication_failure (#call) - A callable object that will be run if authentication fails (see Authentication failure section)
    reconnect_delay: 20 # (Fixnum) - Delay in seconds before attempting reconnect on detected failure
  }.merge(opts)

  unless (@url = @opts.delete(:url))
    raise "Missing 'url' option for AQMP layer"
  end
  @address_prefix = @url + '/frcp.'
  _connect()
  super
end

#on_connected(&block) ⇒ Object

TODO: Should be thread safe and check if already connected



65
66
67
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 65

def on_connected(&block)
  @on_connected_procs << block
end

#on_reconnect(key, &block) ⇒ Object

register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server. This may require some additional repairs. If ‘block’ is nil, the callback is removed



73
74
75
76
77
78
79
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 73

def on_reconnect(key, &block)
  if block.nil?
    @on_reconnect.delete(key)
  else
    @on_reconnect[key] = block
  end
end

#receive_file(topic_url, file_path = nil, opts = {}, &block) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 121

def receive_file(topic_url, file_path = nil, opts = {}, &block)
  if topic_url.start_with? @address_prefix
    topic_url = topic_url[@address_prefix.length .. -1]
  end
  require 'omf_common/comm/amqp/amqp_file_transfer'
  file_path ||= File.join(Dir.tmpdir, Dir::Tmpname.make_tmpname('bdcast', '.xxx'))
  FileReceiver.new(file_path, @channel, topic_url, opts, &block)
end

#string_to_topic_address(a_string) ⇒ Object



55
56
57
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 55

def string_to_topic_address(a_string)
  @address_prefix+a_string
end