Class: Msgr::Client
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #connect ⇒ Object
-
#drain ⇒ Object
Purge all queues known to Msgr, if they exist.
-
#initialize(config = {}) ⇒ Client
constructor
A new instance of Client.
- #publish(payload, opts = {}) ⇒ Object
- #purge(release: false) ⇒ Object
- #release ⇒ Object
- #routes ⇒ Object
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop(opts = {}) ⇒ Object
- #uri ⇒ Object
Methods included from Logging
Constructor Details
#initialize(config = {}) ⇒ Client
Returns a new instance of Client.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/msgr/client.rb', line 13 def initialize(config = {}) @config = { host: '127.0.0.1', vhost: '/', max: 2, } @config.merge! parse(config.delete(:uri)) if config[:uri] @config.merge! config.symbolize_keys @mutex = ::Mutex.new @routes = load_routes @pid ||= ::Process.pid log(:debug) { "Created new client on process ##{@pid}..." } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/msgr/client.rb', line 11 def config @config end |
Instance Method Details
#connect ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/msgr/client.rb', line 66 def connect mutex.synchronize do check_process! return if connection.running? log(:debug) { "Connect to #{uri}..." } connection.connect end end |
#drain ⇒ Object
Purge all queues known to Msgr, if they exist.
105 106 107 108 109 |
# File 'lib/msgr/client.rb', line 105 def drain @routes.each do |route| connection.purge_queue(route.name) end end |
#publish(payload, opts = {}) ⇒ Object
111 112 113 114 115 116 |
# File 'lib/msgr/client.rb', line 111 def publish(payload, opts = {}) mutex.synchronize do check_process! sync_publish payload, opts end end |
#purge(release: false) ⇒ Object
92 93 94 95 96 97 98 99 100 |
# File 'lib/msgr/client.rb', line 92 def purge(release: false) mutex.synchronize do check_process! log(:debug) { "Purge all queues on #{uri}..." } connection.purge(release: release) end end |
#release ⇒ Object
124 125 126 127 128 129 130 131 |
# File 'lib/msgr/client.rb', line 124 def release mutex.synchronize do check_process! return unless running? connection.release end end |
#routes ⇒ Object
118 119 120 121 122 |
# File 'lib/msgr/client.rb', line 118 def routes mutex.synchronize do @routes end end |
#running? ⇒ Boolean
48 49 50 51 52 53 |
# File 'lib/msgr/client.rb', line 48 def running? mutex.synchronize do check_process! connection.running? end end |
#start ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/msgr/client.rb', line 55 def start mutex.synchronize do check_process! return if connection.running? log(:debug) { "Start on #{uri}..." } connection.bind(@routes) end end |
#stop(opts = {}) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/msgr/client.rb', line 77 def stop(opts = {}) mutex.synchronize do check_process! log(:debug) { "Stop on #{uri}..." } connection.release connection.delete if opts[:delete] connection.close dispatcher.shutdown reset end end |
#uri ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/msgr/client.rb', line 30 def uri @uri = begin uri = ::URI.parse('amqp://localhost') uri.user = CGI.escape(config[:user]) if config.key?(:user) uri.password = '****' if config.key?(:pass) uri.host = config[:host] if config.key?(:host) uri.port = config[:port] if config.key?(:port) uri.scheme = config[:ssl] ? 'amqps' : 'amqp' if config.key?(:vhost) && config[:vhost] != '/' uri.path = "/#{CGI.escape(config[:vhost])}" end uri end end |