45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 45
def connect
return if !@redis_subscriber.nil? @redis_subscriber = ::Redis.new(ClusteredRpc.options)
@redis_publish = ::Redis.new(ClusteredRpc.options)
@subscribed = false
@subscriber_thread = Thread.new do
begin
@redis_subscriber.subscribe( @redis_message_pubsub_key ) do |on|
on.subscribe do |channel, subscriptions|
@retry_count = 0
ClusteredRpc.logger.info {"ClusteredRpc: Subscribed to ##{channel} (#{subscriptions} subscriptions)"}
@subscribed = true
end
on.message do |channel, message|
ClusteredRpc.logger.debug {"ClusteredRpc: Handling message ##{channel}: #{message}"}
begin
message = JSON.parse(message) rescue message
if message.is_a? Hash
result = run_method_from_message(message)
ClusteredRpc.logger.debug {"ClusteredRpc: Got result: #{result}"}
request_id = message['request_id']
if request_id
ClusteredRpc.logger.debug {"Setting ClusterSend result: request:#{request_id}[#{Process.pid}]"}
@redis_publish.pipelined do
@redis_publish.hmset "request:#{request_id}", ClusteredRpc.instance_id, result.to_json
@redis_publish.expire "request:#{request_id}", 600
end
end
else
ClusteredRpc.logger.warn "Unknown message type: #{message.class}"
end
rescue => e
ClusteredRpc.logger.error e.backtrace.join("\n")
ClusteredRpc.logger.error "Error[#{e.message}] Handling message ##{channel}: #{message}"
end
end
on.unsubscribe do |channel, subscriptions|
ClusteredRpc.logger.info {"ClusteredRpc: Unsubscribed from ##{channel} (#{subscriptions} subscriptions)"}
end
end
rescue Redis::BaseConnectionError => e
ClusteredRpc.logger.error e.message
@retry_count ||= 0
@retry_count += 1
sleep_seconds = [[@retry_count,10].min, 5].max
ClusteredRpc.logger.warn "ClusteredRpc: Retrying redis connection in #{sleep_seconds} seconds: #{@retry_count}"
ClusteredRpc.logger.info @config
sleep sleep_seconds
retry if @retry_count <= 300
ClusteredRpc.logger.warn "ClusteredRpc: Could not reconnect to Redis"
ensure
ClusteredRpc.logger.info "ClusteredRpc: Subscription thread terminated..."
@subscribed = false
end end attempts = 0
while !subscribed? do
sleep(1)
attempts += 1
ClusteredRpc.logger.info "ClusteredRpc: Waiting for subscription...#{attempts} times"
raise "ClusteredRpc: Could not subscribe after #{attempts} attempts" if attempts > 15
end
end
|