Class: LS4::HeartbeatClientService

Inherits:
Service show all
Defined in:
lib/ls4/service/heartbeat.rb

Direct Known Subclasses

HeartbeatMemberService

Instance Method Summary collapse

Methods inherited from Service

init

Methods included from EventBus::SingletonMixin

#ebus_bind!, #ebus_connect, extended

Methods included from EventBus::BusMixin

#ebus_all_slots, #ebus_disconnect!

Methods included from EventBus::DeclarerBase::Methods

#connect, #ebus_all_slots, #ebus_call_log, #ebus_call_slots, #ebus_signal_error, #ebus_signal_log, #ebus_signal_slots

Methods included from EventBus::DeclarerBase

#call_slot, #signal_slot

Constructor Details

#initializeHeartbeatClientService

Returns a new instance of HeartbeatClientService.



72
73
74
# File 'lib/ls4/service/heartbeat.rb', line 72

def initialize
	@heartbeat_nid = nil
end

Instance Method Details

#do_heartbeat_blockingObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/ls4/service/heartbeat.rb', line 127

def do_heartbeat_blocking
	sync_hash = SyncBus.get_hash
	begin
		res = @cs.call(:heartbeat, @heartbeat_nid, sync_hash)
		hbres = HeartbeatResponse.new.from_msgpack(res)
		if hbres.sync_needed
			ProcessBus.submit {
				SyncBus.try_sync
			}
		end
	rescue
		$log.error "heartbeat error: #{$!}"
		$log.error_backtrace $!.backtrace
	end
	nil
end

#heartbeat_blocking!Object



144
145
146
# File 'lib/ls4/service/heartbeat.rb', line 144

def heartbeat_blocking!
	do_heartbeat_blocking
end

#runObject

def get_cs_session ProcessBus.get_session(ConfigBus.get_cs_address) end

def on_timer do_heartbeat end

def do_heartbeat sync_hash = SyncBus.get_hash get_cs_session.callback(:heartbeat, @heartbeat_nid, sync_hash) do |future| begin hbres = HeartbeatResponse.new.from_msgpack(future.get) ack_heartbeat(hbres) rescue $log.error “heartbeat error: #$!” $log.error_backtrace $!.backtrace end end end

def ack_heartbeat(hbres) if hbres.sync_needed SyncBus.try_sync end end

ebus_connect :ProcessBus, :on_timer

def heartbeat_blocking! do_heartbeat.join end



110
111
112
113
114
115
116
117
118
119
# File 'lib/ls4/service/heartbeat.rb', line 110

def run
	@cs = MessagePack::RPC::Client.new(*ConfigBus.get_cs_address)
	@end = false
	@thread = Thread.new do
		while !@end
			sleep 1
			do_heartbeat_blocking
		end
	end
end

#shutdownObject



121
122
123
124
125
# File 'lib/ls4/service/heartbeat.rb', line 121

def shutdown
	@end = true
	#@thread.join
	@cs.close
end