Class: LS4::DataServerService

Inherits:
Service show all
Defined in:
lib/ls4/service/data_server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Service

init

Methods included from EventBus::SingletonMixin

#ebus_bind!, #ebus_connect, extended

Methods included from EventBus::BusMixin

#ebus_all_slots, #ebus_disconnect!

Methods included from EventBus::DeclarerBase::Methods

#connect, #ebus_all_slots, #ebus_call_log, #ebus_call_slots, #ebus_signal_error, #ebus_signal_log, #ebus_signal_slots

Methods included from EventBus::DeclarerBase

#call_slot, #signal_slot

Constructor Details

#initializeDataServerService

Returns a new instance of DataServerService.



22
23
24
25
26
27
28
# File 'lib/ls4/service/data_server.rb', line 22

def initialize
	@self_nid = ConfigBus.self_nid
	@self_rsids = ConfigBus.self_rsids
	@stat_cmd_read = 0
	@stat_cmd_write = 0
	@stat_cmd_delete = 0
end

Instance Attribute Details

#stat_cmd_deleteObject (readonly)

Returns the value of attribute stat_cmd_delete.



147
148
149
# File 'lib/ls4/service/data_server.rb', line 147

def stat_cmd_delete
  @stat_cmd_delete
end

#stat_cmd_readObject (readonly)

Returns the value of attribute stat_cmd_read.



145
146
147
# File 'lib/ls4/service/data_server.rb', line 145

def stat_cmd_read
  @stat_cmd_read
end

#stat_cmd_writeObject (readonly)

Returns the value of attribute stat_cmd_write.



146
147
148
# File 'lib/ls4/service/data_server.rb', line 146

def stat_cmd_write
  @stat_cmd_write
end

Instance Method Details

#on_timerObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/ls4/service/data_server.rb', line 127

def on_timer
	nids = []
	@self_rsids.each {|rsid|
		begin
			nids.concat MasterSelectBus.select_master_static(rsid)
		rescue
		end
	}
	done = [@self_nid]
	nids.each {|nid|
		if !done.include?(nid) && !MembershipBus.is_fault(nid)
			session = MembershipBus.get_session_nid(nid)
			SlaveBus.try_replicate(nid, session)
			done << nid
		end
	}
end

#rpc_delete_direct(okey) ⇒ Object



72
73
74
75
76
77
78
79
80
# File 'lib/ls4/service/data_server.rb', line 72

def rpc_delete_direct(okey)
	@stat_cmd_delete += 1
	d = UpdateLogData.new(okey.vtime, okey.key)
	deleted = nil
	UpdateLogBus.append(d.dump) do
		deleted = StorageBus.delete(okey.vtime, okey.key)
	end
	deleted
end

#rpc_exist_direct(okey) ⇒ Object

def rpc_resize_direct(okey, size) # TODO: stat_cmd_resize? # FIXME size field? d = UpdateLogData.new(okey.vtime, okey.key, nil, size) UpdateLogBus.append(d.dump) do StorageBus.resize(okey.vtime, okey.key, size) end nil end



68
69
70
# File 'lib/ls4/service/data_server.rb', line 68

def rpc_exist_direct(okey)
	StorageBus.exist(okey.vtime, okey.key)
end

#rpc_get_direct(okey) ⇒ Object



30
31
32
33
# File 'lib/ls4/service/data_server.rb', line 30

def rpc_get_direct(okey)
	@stat_cmd_read += 1
	StorageBus.get(okey.vtime, okey.key)
end

#rpc_read_direct(okey, offset, size) ⇒ Object



35
36
37
38
# File 'lib/ls4/service/data_server.rb', line 35

def rpc_read_direct(okey, offset, size)
	@stat_cmd_read += 1
	StorageBus.read(okey.vtime, okey.key, offset, size)
end

#rpc_replicate_notify(nid) ⇒ Object



117
118
119
120
121
# File 'lib/ls4/service/data_server.rb', line 117

def rpc_replicate_notify(nid)
	session = MembershipBus.get_session_nid(nid)
	SlaveBus.try_replicate(nid, session)
	nil
end

#rpc_replicate_pull(pos, limit) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/ls4/service/data_server.rb', line 82

def rpc_replicate_pull(pos, limit)
	mkeys = []
	msgs = []
	msize = 0
	while true
		raw, npos = UpdateLogBus.get(pos)
		unless raw
			break
		end
		d = UpdateLogData.load(raw)
		# set or delete
		if mkeys.include?(d.key)
			pos = npos
		else
			if d.offset && d.size
				data = StorageBus.read(d.vtime, d.key, d.offset, d.size)
			else
				data = StorageBus.get(d.vtime, d.key)
				mkeys << d.key
			end
			# data may be null => deleted
			if data
				msgs << [d.vtime, d.key, d.offset, data]
				msize += data.size
			else
				# data is deleted
				msgs << [d.vtime, d.key, 0, nil]
			end
			pos = npos
			break if msize > limit
		end
	end
	[pos, msgs]
end

#rpc_set_direct(okey, data) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/ls4/service/data_server.rb', line 40

def rpc_set_direct(okey, data)
	@stat_cmd_write += 1
	d = UpdateLogData.new(okey.vtime, okey.key)
	UpdateLogBus.append(d.dump) do
		StorageBus.set(okey.vtime, okey.key, data)
	end
	nil
end

#stat_db_itemsObject



123
124
125
# File 'lib/ls4/service/data_server.rb', line 123

def stat_db_items
	StorageBus.get_items
end