Class: Sidekiq::Throttled::Communicator

Inherits:
Object
  • Object
show all
Includes:
ExceptionHandler, Singleton
Defined in:
lib/sidekiq/throttled/communicator.rb,
lib/sidekiq/throttled/communicator/listener.rb,
lib/sidekiq/throttled/communicator/callbacks.rb

Overview

Inter-process communication for sidekiq. It starts listener thread on sidekiq server and listens for incoming messages.

Examples:


# Add incoming message handler for server
Communicator.instance.receive "knock" do |who|
  puts "#{who}'s knocking on the door"
end

# Emit message from console
Sidekiq.redis do |conn|
  Communicator.instance.transmit(conn, "knock", "ixti")
end

Instance Method Summary collapse

Constructor Details

#initializeCommunicator

Initializes singleton instance.



36
37
38
39
40
# File 'lib/sidekiq/throttled/communicator.rb', line 36

def initialize
  @callbacks = Callbacks.new
  @listener  = nil
  @mutex     = Mutex.new
end

Instance Method Details

#ready { ... } ⇒ void

This method returns an undefined value.

Communicator readiness hook.

Yields:

  • Runs given block every time listener thread subscribes to Redis pub/sub channel.



110
111
112
113
# File 'lib/sidekiq/throttled/communicator.rb', line 110

def ready(&handler)
  @callbacks.on("ready", &handler)
  yield if @listener&.ready?
end

#receive(message) {|payload| ... } ⇒ void

This method returns an undefined value.

Add incoming message handler.

Examples:


Communicator.instance.receive "knock" do |payload|
  # do something upon `knock` message
end

Parameters:

  • message (#to_s)

Yields:

  • (payload)

    Runs given block everytime ‘message` being received.

Yield Parameters:

  • payload (Object, nil)

    Payload that was transmitted

Yield Returns:

  • (void)


101
102
103
# File 'lib/sidekiq/throttled/communicator.rb', line 101

def receive(message, &handler)
  @callbacks.on("message:#{message}", &handler)
end

#start_listenervoid

This method returns an undefined value.

Starts listener thread.



45
46
47
48
49
# File 'lib/sidekiq/throttled/communicator.rb', line 45

def start_listener
  @mutex.synchronize do
    @listener ||= Listener.new(CHANNEL_NAME, @callbacks)
  end
end

#stop_listenervoid

This method returns an undefined value.

Stops listener thread.



54
55
56
57
58
59
# File 'lib/sidekiq/throttled/communicator.rb', line 54

def stop_listener
  @mutex.synchronize do
    @listener&.stop
    @listener = nil
  end
end

#transmit(redis, message, payload = nil) ⇒ void

This method returns an undefined value.

Transmit message to listeners.

Examples:


Sidekiq.redis do |conn|
  Communicator.instance.transmit(conn, "knock")
end

Parameters:

  • redis (Redis)

    Redis client

  • message (#to_s)
  • payload (Object) (defaults to: nil)


84
85
86
# File 'lib/sidekiq/throttled/communicator.rb', line 84

def transmit(redis, message, payload = nil)
  redis.publish(CHANNEL_NAME, Marshal.dump([message.to_s, payload]))
end