Class: Moleculer::Transporters::Redis::Subscriber::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/moleculer/transporters/redis.rb

Overview

Represents a subscription

Instance Method Summary collapse

Constructor Details

#initialize(config:, channel:, block:) ⇒ Subscription

Returns a new instance of Subscription.



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/moleculer/transporters/redis.rb', line 57

def initialize(config:, channel:, block:)
  @connection  = ::Redis.new(url: config.transporter)
  @channel     = channel
  @block       = block
  @logger      = config.logger.get_child("[REDIS.TRANSPORTER.SUBSCRIPTION.#{channel}]")
  @serializer  = Serializers.for(config.serializer).new(config)
  @node_id     = config.node_id
  @config      = config

  # it is necessary to send some sort of message to signal the subscriber to disconnect and shutdown
  # this is an internal message
  reset_disconnect
end

Instance Method Details

#connectObject

Starts the subscriber



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/moleculer/transporters/redis.rb', line 73

def connect
  @thread = Thread.new do
    begin
      @logger.debug "starting subscription to '#{@channel}'"
      @connection.subscribe(@channel) do |on|
        on.unsubscribe do
          unsubscribe
        end

        on.message do |_, message|
          packet = process_message(message)
          next unless packet

          process_packet(packet)
        end
      end
    rescue StandardError => error
      @logger.fatal error
      exit 1
    end
  end
  self
end

#disconnectObject



97
98
99
100
101
102
# File 'lib/moleculer/transporters/redis.rb', line 97

def disconnect
  @logger.debug "unsubscribing from '#{@channel}'"
  redis = ::Redis.new(url: @uri)
  redis.publish(@channel, @disconnect_hash.value)
  redis.disconnect!
end

#reset_disconnectObject



104
105
106
107
# File 'lib/moleculer/transporters/redis.rb', line 104

def reset_disconnect
  @disconnect_hash ||= Concurrent::AtomicReference.new
  @disconnect_hash.set("#{@node_id}.#{SecureRandom.hex}.disconnect")
end