Class: Vx::Common::AMQP::Session

Inherits:
Object
  • Object
show all
Includes:
Instrument
Defined in:
lib/vx/common/amqp/session.rb

Defined Under Namespace

Classes: ConnectionDoesNotExist

Constant Summary collapse

CHANNEL_KEY =
:vx_amqp_channel
@@session_lock =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Instrument

#instrument

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



15
16
17
# File 'lib/vx/common/amqp/session.rb', line 15

def conn
  @conn
end

Class Method Details

.resumeObject



26
27
28
# File 'lib/vx/common/amqp/session.rb', line 26

def resume
  @shutdown = false
end

.shutdownObject



18
19
20
# File 'lib/vx/common/amqp/session.rb', line 18

def shutdown
  @shutdown = true
end

.shutdown?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/vx/common/amqp/session.rb', line 22

def shutdown?
  @shutdown == true
end

Instance Method Details

#channelObject



97
98
99
100
101
# File 'lib/vx/common/amqp/session.rb', line 97

def channel
  assert_connection_is_open

  Thread.current[CHANNEL_KEY] || conn.default_channel
end

#closeObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/vx/common/amqp/session.rb', line 31

def close
  if open?
    @@session_lock.synchronize do
      begin
        conn.close
      rescue Bunny::ChannelError => e
        warn e
      end
      while conn.status != :closed
        sleep 0.01
      end
      @conn = nil
    end
  end
end

#configObject



130
131
132
# File 'lib/vx/common/amqp/session.rb', line 130

def config
  Common::AMQP.config
end

#conn_infoObject



117
118
119
120
121
# File 'lib/vx/common/amqp/session.rb', line 117

def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  end
end

#declare_exchange(name, options = nil) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/vx/common/amqp/session.rb', line 78

def declare_exchange(name, options = nil)
  assert_connection_is_open

  options  ||= {}
  name     ||= config.default_exchange_name
  ch         = options.delete(:channel) || channel
  type, opts = get_exchange_type_and_options options
  ch.exchange name, opts.merge(type: type)
end

#declare_queue(name, options = nil) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/vx/common/amqp/session.rb', line 88

def declare_queue(name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch = options.delete(:channel) || channel
  name, opts = get_queue_name_and_options(name, options)
  ch.queue name, opts
end

#openObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/vx/common/amqp/session.rb', line 47

def open
  return self if open?

  @@session_lock.synchronize do
    self.class.resume

    @conn ||= Bunny.new config.url, heartbeat: :server

    instrumentation = {
      info: conn_info
    }

    instrument("start_connecting.consumer.amqp", instrumentation)

    instrument("connect.consumer.amqp", instrumentation) do
      unless conn.open?
        conn.start
        while conn.connecting?
          sleep 0.01
        end
      end
    end
  end

  self
end

#open?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/vx/common/amqp/session.rb', line 74

def open?
  conn && conn.open? && conn.status == :open
end

#server_nameObject



123
124
125
126
127
128
# File 'lib/vx/common/amqp/session.rb', line 123

def server_name
  if conn
    p = conn.server_properties || {}
    "#{p["product"]}/#{p["version"]}"
  end
end

#with_channelObject



103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/vx/common/amqp/session.rb', line 103

def with_channel
  assert_connection_is_open

  old,new = nil
  begin
    old,new = Thread.current[CHANNEL_KEY], conn.create_channel
    Thread.current[CHANNEL_KEY] = new
    yield
  ensure
    Thread.current[CHANNEL_KEY] = old
    new.close if new && new.open?
  end
end