Class: Gruf::SynchronizedClient
- Defined in:
- lib/gruf/synchronized_client.rb
Overview
Ensures that we only have one active call to a given endpoint with a given set of params. This can be useful to mitigate thundering herds.
Instance Attribute Summary collapse
-
#unsynchronized_methods ⇒ Object
readonly
Returns the value of attribute unsynchronized_methods.
Attributes inherited from Client
#base_klass, #opts, #service_klass
Instance Method Summary collapse
-
#call(request_method, params = {}, metadata = {}, opts = {}, &block) ⇒ Gruf::Response
Call the client’s method with given params.
-
#initialize(service:, options: {}, client_options: {}) ⇒ SynchronizedClient
constructor
Initialize the client and setup the stub.
Methods inherited from Client
Methods included from Loggable
Constructor Details
#initialize(service:, options: {}, client_options: {}) ⇒ SynchronizedClient
Initialize the client and setup the stub
41 42 43 44 45 46 47 |
# File 'lib/gruf/synchronized_client.rb', line 41 def initialize(service:, options: {}, client_options: {}) @unsynchronized_methods = .delete(:unsynchronized_methods) { [] } @expiry = .delete(:internal_cache_expiry) { Gruf.synchronized_client_internal_cache_expiry } @locks = Concurrent::Map.new @results = Concurrent::Map.new super end |
Instance Attribute Details
#unsynchronized_methods ⇒ Object (readonly)
Returns the value of attribute unsynchronized_methods.
28 29 30 |
# File 'lib/gruf/synchronized_client.rb', line 28 def unsynchronized_methods @unsynchronized_methods end |
Instance Method Details
#call(request_method, params = {}, metadata = {}, opts = {}, &block) ⇒ Gruf::Response
Call the client’s method with given params. If another call is already active for the same endpoint and the same params, block until the active call is complete. When unblocked, callers will get a copy of the original result.
error type that was returned
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/gruf/synchronized_client.rb', line 62 def call(request_method, params = {}, = {}, opts = {}, &block) # Allow for bypassing extra behavior for selected methods return super if unsynchronized_methods.include?(request_method.to_sym) # Generate a unique key based on the method and params key = "#{request_method}.#{params.hash}" # Create a lock for this call if we haven't seen it already, then acquire it lock = @locks.compute_if_absent(key) { Mutex.new } lock.synchronize do # Return value from results cache if it exists. This occurs for callers that were # waiting on the lock while the first caller was making the actual grpc call. response = @results.get(lock) if response Gruf.logger.debug "Returning cached result for #{key}:#{lock.inspect}" next response end # Make the grpc call and record response for other callers that are blocked # on the same lock response = super @results.put(lock, response) # Schedule a task to come in later and clean out result to prevent memory bloat Concurrent::ScheduledTask.new(@expiry, args: [@results, lock]) { |h, k| h.delete(k) }.execute # Remove the lock from the map. The next caller to come through with the # same params will create a new lock and start the process over again. # Anyone who was waiting on this call will be using a local reference # to the same lock as us, and will fetch the result from the cache. @locks.delete(key) # Return response response end end |