Class: Sidekiq::Throttled::Communicator

Inherits:
Object
  • Object
show all
Includes:
ExceptionHandler, Singleton
Defined in:
lib/sidekiq/throttled/communicator.rb,
lib/sidekiq/throttled/communicator/listener.rb,
lib/sidekiq/throttled/communicator/callbacks.rb

Overview

Inter-process communication for sidekiq. It starts listener thread on sidekiq server and listens for incoming messages.

Examples:


# Add incoming message handler for server
Communicator.instance.receive "knock" do |who|
  puts "#{who}'s knocking on the door"
end

# Emit message from console
Sidekiq.redis do |conn|
  Communicator.instance.transmit(conn, "knock", "ixti")
end

Instance Method Summary collapse

Constructor Details

#initializeCommunicator

Initializes singleton instance.



37
38
39
40
41
# File 'lib/sidekiq/throttled/communicator.rb', line 37

def initialize
  @callbacks = Callbacks.new
  @listener  = nil
  @mutex     = Mutex.new
end

Instance Method Details

#ready { ... } ⇒ void

This method returns an undefined value.

Communicator readiness hook.

Yields:

  • Runs given block every time listener thread subscribes to Redis pub/sub channel.



111
112
113
114
# File 'lib/sidekiq/throttled/communicator.rb', line 111

def ready(&handler)
  @callbacks.on("ready", &handler)
  yield if @listener && @listener.ready?
end

#receive(message) {|payload| ... } ⇒ void

This method returns an undefined value.

Add incoming message handler.

Examples:


Communicator.instance.receive "knock" do |payload|
  # do something upon `knock` message
end

Parameters:

  • message (#to_s)

Yields:

  • (payload)

    Runs given block everytime ‘message` being received.

Yield Parameters:

  • payload (Object, nil)

    Payload that was transmitted

Yield Returns:

  • (void)


102
103
104
# File 'lib/sidekiq/throttled/communicator.rb', line 102

def receive(message, &handler)
  @callbacks.on("message:#{message}", &handler)
end

#start_listenervoid

This method returns an undefined value.

Starts listener thread.



46
47
48
49
50
# File 'lib/sidekiq/throttled/communicator.rb', line 46

def start_listener
  @mutex.synchronize do
    @listener ||= Listener.new(CHANNEL_NAME, @callbacks)
  end
end

#stop_listenervoid

This method returns an undefined value.

Stops listener thread.



55
56
57
58
59
60
# File 'lib/sidekiq/throttled/communicator.rb', line 55

def stop_listener
  @mutex.synchronize do
    @listener.stop if @listener
    @listener = nil
  end
end

#transmit(redis, message, payload = nil) ⇒ void

This method returns an undefined value.

Transmit message to listeners.

Examples:


Sidekiq.redis do |conn|
  Communicator.instance.transmit(conn, "knock")
end

Parameters:

  • redis (Redis)

    Redis client

  • message (#to_s)
  • payload (Object) (defaults to: nil)


85
86
87
# File 'lib/sidekiq/throttled/communicator.rb', line 85

def transmit(redis, message, payload = nil)
  redis.publish(CHANNEL_NAME, Marshal.dump([message.to_s, payload]))
end