Class: LS4::DataServerService
- Inherits:
-
Service
- Object
- EventBus::Singleton
- Service
- LS4::DataServerService
- Defined in:
- lib/ls4/service/data_server.rb
Instance Attribute Summary collapse
-
#stat_cmd_delete ⇒ Object
readonly
Returns the value of attribute stat_cmd_delete.
-
#stat_cmd_read ⇒ Object
readonly
Returns the value of attribute stat_cmd_read.
-
#stat_cmd_write ⇒ Object
readonly
Returns the value of attribute stat_cmd_write.
Instance Method Summary collapse
-
#initialize ⇒ DataServerService
constructor
A new instance of DataServerService.
- #on_timer ⇒ Object
- #rpc_delete_direct(okey) ⇒ Object
-
#rpc_exist_direct(okey) ⇒ Object
def rpc_resize_direct(okey, size) # TODO: stat_cmd_resize? # FIXME size field? d = UpdateLogData.new(okey.vtime, okey.key, nil, size) UpdateLogBus.append(d.dump) do StorageBus.resize(okey.vtime, okey.key, size) end nil end.
- #rpc_get_direct(okey) ⇒ Object
- #rpc_read_direct(okey, offset, size) ⇒ Object
- #rpc_replicate_notify(nid) ⇒ Object
- #rpc_replicate_pull(pos, limit) ⇒ Object
- #rpc_set_direct(okey, data) ⇒ Object
- #stat_db_items ⇒ Object
Methods inherited from Service
Methods included from EventBus::SingletonMixin
#ebus_bind!, #ebus_connect, extended
Methods included from EventBus::BusMixin
#ebus_all_slots, #ebus_disconnect!
Methods included from EventBus::DeclarerBase::Methods
#connect, #ebus_all_slots, #ebus_call_log, #ebus_call_slots, #ebus_signal_error, #ebus_signal_log, #ebus_signal_slots
Methods included from EventBus::DeclarerBase
Constructor Details
#initialize ⇒ DataServerService
Returns a new instance of DataServerService.
22 23 24 25 26 27 28 |
# File 'lib/ls4/service/data_server.rb', line 22 def initialize @self_nid = ConfigBus.self_nid @self_rsids = ConfigBus.self_rsids @stat_cmd_read = 0 @stat_cmd_write = 0 @stat_cmd_delete = 0 end |
Instance Attribute Details
#stat_cmd_delete ⇒ Object (readonly)
Returns the value of attribute stat_cmd_delete.
147 148 149 |
# File 'lib/ls4/service/data_server.rb', line 147 def stat_cmd_delete @stat_cmd_delete end |
#stat_cmd_read ⇒ Object (readonly)
Returns the value of attribute stat_cmd_read.
145 146 147 |
# File 'lib/ls4/service/data_server.rb', line 145 def stat_cmd_read @stat_cmd_read end |
#stat_cmd_write ⇒ Object (readonly)
Returns the value of attribute stat_cmd_write.
146 147 148 |
# File 'lib/ls4/service/data_server.rb', line 146 def stat_cmd_write @stat_cmd_write end |
Instance Method Details
#on_timer ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/ls4/service/data_server.rb', line 127 def on_timer nids = [] @self_rsids.each {|rsid| begin nids.concat MasterSelectBus.select_master_static(rsid) rescue end } done = [@self_nid] nids.each {|nid| if !done.include?(nid) && !MembershipBus.is_fault(nid) session = MembershipBus.get_session_nid(nid) SlaveBus.try_replicate(nid, session) done << nid end } end |
#rpc_delete_direct(okey) ⇒ Object
72 73 74 75 76 77 78 79 80 |
# File 'lib/ls4/service/data_server.rb', line 72 def rpc_delete_direct(okey) @stat_cmd_delete += 1 d = UpdateLogData.new(okey.vtime, okey.key) deleted = nil UpdateLogBus.append(d.dump) do deleted = StorageBus.delete(okey.vtime, okey.key) end deleted end |
#rpc_exist_direct(okey) ⇒ Object
def rpc_resize_direct(okey, size) # TODO: stat_cmd_resize? # FIXME size field? d = UpdateLogData.new(okey.vtime, okey.key, nil, size) UpdateLogBus.append(d.dump) do StorageBus.resize(okey.vtime, okey.key, size) end nil end
68 69 70 |
# File 'lib/ls4/service/data_server.rb', line 68 def rpc_exist_direct(okey) StorageBus.exist(okey.vtime, okey.key) end |
#rpc_get_direct(okey) ⇒ Object
30 31 32 33 |
# File 'lib/ls4/service/data_server.rb', line 30 def rpc_get_direct(okey) @stat_cmd_read += 1 StorageBus.get(okey.vtime, okey.key) end |
#rpc_read_direct(okey, offset, size) ⇒ Object
35 36 37 38 |
# File 'lib/ls4/service/data_server.rb', line 35 def rpc_read_direct(okey, offset, size) @stat_cmd_read += 1 StorageBus.read(okey.vtime, okey.key, offset, size) end |
#rpc_replicate_notify(nid) ⇒ Object
117 118 119 120 121 |
# File 'lib/ls4/service/data_server.rb', line 117 def rpc_replicate_notify(nid) session = MembershipBus.get_session_nid(nid) SlaveBus.try_replicate(nid, session) nil end |
#rpc_replicate_pull(pos, limit) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/ls4/service/data_server.rb', line 82 def rpc_replicate_pull(pos, limit) mkeys = [] msgs = [] msize = 0 while true raw, npos = UpdateLogBus.get(pos) unless raw break end d = UpdateLogData.load(raw) # set or delete if mkeys.include?(d.key) pos = npos else if d.offset && d.size data = StorageBus.read(d.vtime, d.key, d.offset, d.size) else data = StorageBus.get(d.vtime, d.key) mkeys << d.key end # data may be null => deleted if data msgs << [d.vtime, d.key, d.offset, data] msize += data.size else # data is deleted msgs << [d.vtime, d.key, 0, nil] end pos = npos break if msize > limit end end [pos, msgs] end |
#rpc_set_direct(okey, data) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/ls4/service/data_server.rb', line 40 def rpc_set_direct(okey, data) @stat_cmd_write += 1 d = UpdateLogData.new(okey.vtime, okey.key) UpdateLogBus.append(d.dump) do StorageBus.set(okey.vtime, okey.key, data) end nil end |
#stat_db_items ⇒ Object
123 124 125 |
# File 'lib/ls4/service/data_server.rb', line 123 def stat_db_items StorageBus.get_items end |