Class: Moleculer::Transporters::Redis::Subscriber::Subscription
- Inherits:
-
Object
- Object
- Moleculer::Transporters::Redis::Subscriber::Subscription
- Defined in:
- lib/moleculer/transporters/redis.rb
Overview
Represents a subscription
Instance Method Summary collapse
-
#connect ⇒ Object
Starts the subscriber.
- #disconnect ⇒ Object
-
#initialize(config:, channel:, block:) ⇒ Subscription
constructor
A new instance of Subscription.
- #reset_disconnect ⇒ Object
Constructor Details
#initialize(config:, channel:, block:) ⇒ Subscription
Returns a new instance of Subscription.
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/moleculer/transporters/redis.rb', line 57 def initialize(config:, channel:, block:) @connection = ::Redis.new(url: config.transporter) @channel = channel @block = block @logger = config.logger.get_child("[REDIS.TRANSPORTER.SUBSCRIPTION.#{channel}]") @serializer = Serializers.for(config.serializer).new(config) @node_id = config.node_id @config = config # it is necessary to send some sort of message to signal the subscriber to disconnect and shutdown # this is an internal message reset_disconnect end |
Instance Method Details
#connect ⇒ Object
Starts the subscriber
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/moleculer/transporters/redis.rb', line 73 def connect @thread = Thread.new do begin @logger.debug "starting subscription to '#{@channel}'" @connection.subscribe(@channel) do |on| on.unsubscribe do unsubscribe end on. do |_, | packet = () next unless packet process_packet(packet) end end rescue StandardError => error @logger.fatal error exit 1 end end self end |
#disconnect ⇒ Object
97 98 99 100 101 102 |
# File 'lib/moleculer/transporters/redis.rb', line 97 def disconnect @logger.debug "unsubscribing from '#{@channel}'" redis = ::Redis.new(url: @uri) redis.publish(@channel, @disconnect_hash.value) redis.disconnect! end |
#reset_disconnect ⇒ Object
104 105 106 107 |
# File 'lib/moleculer/transporters/redis.rb', line 104 def reset_disconnect @disconnect_hash ||= Concurrent::AtomicReference.new @disconnect_hash.set("#{@node_id}.#{SecureRandom.hex}.disconnect") end |