Class: Sidekiq::Throttled::Communicator
- Inherits:
-
Object
- Object
- Sidekiq::Throttled::Communicator
- Includes:
- ExceptionHandler, Singleton
- Defined in:
- lib/sidekiq/throttled/communicator.rb,
lib/sidekiq/throttled/communicator/listener.rb,
lib/sidekiq/throttled/communicator/callbacks.rb
Overview
Inter-process communication for sidekiq. It starts listener thread on sidekiq server and listens for incoming messages.
Instance Method Summary collapse
-
#initialize ⇒ Communicator
constructor
Initializes singleton instance.
-
#ready { ... } ⇒ void
Communicator readiness hook.
-
#receive(message) {|payload| ... } ⇒ void
Add incoming message handler.
-
#start_listener ⇒ void
Starts listener thread.
-
#stop_listener ⇒ void
Stops listener thread.
-
#transmit(redis, message, payload = nil) ⇒ void
Transmit message to listeners.
Constructor Details
#initialize ⇒ Communicator
Initializes singleton instance.
36 37 38 39 40 |
# File 'lib/sidekiq/throttled/communicator.rb', line 36 def initialize @callbacks = Callbacks.new @listener = nil @mutex = Mutex.new end |
Instance Method Details
#ready { ... } ⇒ void
This method returns an undefined value.
Communicator readiness hook.
110 111 112 113 |
# File 'lib/sidekiq/throttled/communicator.rb', line 110 def ready(&handler) @callbacks.on("ready", &handler) yield if @listener&.ready? end |
#receive(message) {|payload| ... } ⇒ void
This method returns an undefined value.
Add incoming message handler.
101 102 103 |
# File 'lib/sidekiq/throttled/communicator.rb', line 101 def receive(, &handler) @callbacks.on("message:#{}", &handler) end |
#start_listener ⇒ void
This method returns an undefined value.
Starts listener thread.
45 46 47 48 49 |
# File 'lib/sidekiq/throttled/communicator.rb', line 45 def start_listener @mutex.synchronize do @listener ||= Listener.new(CHANNEL_NAME, @callbacks) end end |
#stop_listener ⇒ void
This method returns an undefined value.
Stops listener thread.
54 55 56 57 58 59 |
# File 'lib/sidekiq/throttled/communicator.rb', line 54 def stop_listener @mutex.synchronize do @listener&.stop @listener = nil end end |
#transmit(redis, message, payload = nil) ⇒ void
This method returns an undefined value.
Transmit message to listeners.
84 85 86 |
# File 'lib/sidekiq/throttled/communicator.rb', line 84 def transmit(redis, , payload = nil) redis.publish(CHANNEL_NAME, Marshal.dump([.to_s, payload])) end |