Class: Fleck::Client
Defined Under Namespace
Instance Attribute Summary collapse
-
#local_ip ⇒ Object
readonly
Returns the value of attribute local_ip.
-
#remote_ip ⇒ Object
readonly
Returns the value of attribute remote_ip.
Instance Method Summary collapse
-
#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client
constructor
A new instance of Client.
- #publish(data, options) ⇒ Object
- #remove_request(request_id) ⇒ Object
- #request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object
- #terminate ⇒ Object
Methods included from Loggable
Constructor Details
#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client
Returns a new instance of Client.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/fleck/client.rb', line 8 def initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) @connection = connection @queue_name = queue_name @multiple_responses = multiple_responses @default_timeout = multiple_responses ? 60 : nil @concurrency = [concurrency.to_i, 1].max @requests = ThreadSafe::Hash.new @subscriptions = ThreadSafe::Array.new @terminated = false @mutex = Mutex.new @local_ip = @connection.transport.socket.local_address.ip_address @remote_ip = @connection.transport.socket.remote_address.ip_address @channel = @connection.create_channel @exchange = Bunny::Exchange.new(@channel, :direct, 'fleck') @publisher = Bunny::Exchange.new(@connection.create_channel, exchange_type, exchange_name) @reply_queue = @channel.queue("", exclusive: true, auto_delete: true) @reply_queue.bind(@exchange, routing_key: @reply_queue.name) @concurrency.times { handle_responses! } logger.debug("Client initialized!") at_exit do terminate end end |
Instance Attribute Details
#local_ip ⇒ Object (readonly)
Returns the value of attribute local_ip.
6 7 8 |
# File 'lib/fleck/client.rb', line 6 def local_ip @local_ip end |
#remote_ip ⇒ Object (readonly)
Returns the value of attribute remote_ip.
6 7 8 |
# File 'lib/fleck/client.rb', line 6 def remote_ip @remote_ip end |
Instance Method Details
#publish(data, options) ⇒ Object
62 63 64 65 |
# File 'lib/fleck/client.rb', line 62 def publish(data, ) return if @terminated @mutex.synchronize { @publisher.publish(data, ) } end |
#remove_request(request_id) ⇒ Object
68 69 70 |
# File 'lib/fleck/client.rb', line 68 def remove_request(request_id) @requests.delete request_id end |
#request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fleck/client.rb', line 37 def request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) if @terminated return Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat)) end request = Fleck::Client::Request.new( self, queue, @reply_queue.name, action: action, version: version, headers: headers, params: params, timeout: timeout, multiple_responses: @multiple_responses, rmq_options: , &block ) @requests[request.id] = request request.send!(async) return request.response end |
#terminate ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/fleck/client.rb', line 73 def terminate @terminated = true logger.info "Unsubscribing from #{@reply_queue.name}" @subscriptions.map(&:cancel) # stop receiving new messages logger.info "Canceling pending requests" # cancel pending requests while item = @requests.shift do begin item[1].cancel! rescue => e logger.error e.inspect + "\n" + e.backtrace.join("\n") end end end |