Class: DeepConnect::Session

Inherits:
Object show all
Defined in:
lib/deep-connect/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(deep_space, port, local_id = nil) ⇒ Session

Returns a new instance of Session.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/deep-connect/session.rb', line 23

def initialize(deep_space, port, local_id = nil)
  @status = :INITIALIZE

  @organizer = deep_space.organizer
  @deep_space = deep_space
  @port = port

  @export_queue = Queue.new

  @waiting = Hash.new
  @waiting_mutex = Mutex.new
  @next_request_event_id = 0
  @next_request_event_id_mutex = Mutex.new

  @last_keep_alive = nil
end

Instance Attribute Details

#deep_spaceObject (readonly)

Returns the value of attribute deep_space.



41
42
43
# File 'lib/deep-connect/session.rb', line 41

def deep_space
  @deep_space
end

#organizerObject (readonly)

Returns the value of attribute organizer.



40
41
42
# File 'lib/deep-connect/session.rb', line 40

def organizer
  @organizer
end

Instance Method Details

#accept(ev) ⇒ Object

イベントの受け取り



170
171
172
# File 'lib/deep-connect/session.rb', line 170

def accept(ev)
  @export_queue.push ev
end

#asynchronus_send_to(ref, method, args = [], callback = nil) ⇒ Object Also known as: asyncronus_send_to



200
201
202
203
204
205
206
207
208
209
210
# File 'lib/deep-connect/session.rb', line 200

def asynchronus_send_to(ref, method, args=[], callback=nil)
  unless @status == :SERVICING
	DC.Raise SessionServiceStopped
  end
  ev = Event::AsyncronusRequest.request(self, ref, method, args, callback)
  @waiting_mutex.synchronize do
	@waiting[ev.seq] = ev
  end
  @export_queue.push ev
  nil
end

#block_yield(event, args) ⇒ Object



191
192
193
194
195
196
197
198
# File 'lib/deep-connect/session.rb', line 191

def block_yield(event, args)
  ev = Event::IteratorCallBackRequest.call_back_event(event, args)
  @waiting_mutex.synchronize do
	@waiting[ev.seq] = ev
  end
  @export_queue.push ev
  ev
end

#deregister_root_impl(idsdump) ⇒ Object



299
300
301
302
303
# File 'lib/deep-connect/session.rb', line 299

def deregister_root_impl(idsdump)
  ids = Marshal.load(idsdump)
  @deep_space.delete_roots(ids)
  nil
end

#deregister_root_to_peer(ids) ⇒ Object



294
295
296
297
# File 'lib/deep-connect/session.rb', line 294

def deregister_root_to_peer(ids)
  idsdump = Marshal.dump(ids)
  send_peer_session_no_recv(:deregister_root, idsdump)
end

#get_service(name, waitp = false) ⇒ Object



259
260
261
262
263
264
# File 'lib/deep-connect/session.rb', line 259

def get_service(name, waitp = false)
  if (sv = send_peer_session(:get_service, name, waitp)) == :DEEPCONNECT_NO_SUCH_SERVICE
	DC.Raise NoServiceError, name
  end
  sv
end

#get_service_impl(name, waitp = false) ⇒ Object



266
267
268
# File 'lib/deep-connect/session.rb', line 266

def get_service_impl(name, waitp = false)
  @organizer.service(name, waitp)
end

#import_mq(name, waitp = false) ⇒ Object



271
272
273
274
275
276
# File 'lib/deep-connect/session.rb', line 271

def import_mq(name, waitp = false)
  unless sv = send_peer_session(:import_mq, name, waitp)
    DC.Raise NoServiceError, name
  end
  sv
end

#import_mq_impl(name, waitp = false) ⇒ Object



278
279
280
# File 'lib/deep-connect/session.rb', line 278

def import_mq_impl(name, waitp = false)
  @organizer.get_mq(name, waitp)
end

#keep_aliveObject

def send_class_specs_impl(spec_dump)

  specs = Marshal.load(spec_dump)
  @object_space.recv_class_specs(specs)
end


329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/deep-connect/session.rb', line 329

def keep_alive
  now = @organizer.tick
  if now > @last_keep_alive + Conf.KEEP_ALIVE_INTERVAL*10
	puts "KEEP ALIVE: session #{self} is dead." if Conf.DISPLAY_KEEP_ALIVE
	false
  else
	if Conf.DISPLAY_KEEP_ALIVE
	  puts "KEEP ALIVE: session #{self} is alive(INT: #{now - @last_keep_alive})."
	  puts "KEEP ALIVE: send #{self} to keep alive."
	end
	send_peer_session_no_recv(:recv_keep_alive)
	true
  end
end

#mq_send_to(ref, method, args = [], callback = nil) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/deep-connect/session.rb', line 213

def mq_send_to(ref, method, args=[], callback=nil)
  unless @status == :SERVICING
	DC.Raise SessionServiceStopped
  end
  ev = Event::MQRequest.request(self, ref, method, args, callback)
  @waiting_mutex.synchronize do
	@waiting[ev.seq] = ev
  end
  @export_queue.push ev
  ev.result
end

#next_request_event_idObject

イベントID取得



226
227
228
229
230
# File 'lib/deep-connect/session.rb', line 226

def next_request_event_id
  @next_request_event_id_mutex.synchronize do
	@next_request_event_id += 1
  end
end

#peer_uuidObject Also known as: peer_id



43
44
45
# File 'lib/deep-connect/session.rb', line 43

def peer_uuid
  @deep_space.peer_uuid
end

#receive(ev) ⇒ Object

peerからの受取り



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/deep-connect/session.rb', line 146

def receive(ev)
  case ev
  when Event::MQRequest
#	Thread.start{ev.receiver.enq(self, ev)}
	ev.receiver.enq(self, ev)
  when Event::IteratorCallBackRequest
    Thread.start{@organizer.evaluator.evaluate_block_yield(self, ev)}
  when Event::IteratorRequest
    Thread.start{@organizer.evaluator.evaluate_iterator_request(self, ev)}
  when Event::Request
    Thread.start{@organizer.evaluator.evaluate_request(self, ev)}
  else
	req = nil
	@waiting_mutex.synchronize do
	  req = @waiting.delete(ev.seq)
	end
	unless req
	  DC.InternalError "対応する request eventがありません(#{ev.inspect})"
	end
	req.result = ev
  end
end

#recv_class_specs_impl(specs_dump) ⇒ Object



311
312
313
314
315
# File 'lib/deep-connect/session.rb', line 311

def recv_class_specs_impl(specs_dump)
  specs = Marshal.load(specs_dump)
  @deep_space.class_specs = specs
#p specs
end

#recv_disconnectObject



253
254
255
# File 'lib/deep-connect/session.rb', line 253

def recv_disconnect
  @organizer.disconnect_deep_space(@deep_space, :REQUEST_FROM_PEER)
end

#recv_keep_alive_implObject



344
345
346
347
# File 'lib/deep-connect/session.rb', line 344

def recv_keep_alive_impl
  puts "RECV_KEEP_ALIVE"  if Conf.DISPLAY_KEEP_ALIVE
  @last_keep_alive = @organizer.tick
end

#register_root_impl(id) ⇒ Object



289
290
291
# File 'lib/deep-connect/session.rb', line 289

def register_root_impl(id)
  @deep_space.register_root_from_other_session(id)
end

#register_root_to_peer(id) ⇒ Object



284
285
286
287
# File 'lib/deep-connect/session.rb', line 284

def register_root_to_peer(id)
  # 同期を取るためにno_recvはNG
  send_peer_session(:register_root, id)
end

#send_class_specsObject



306
307
308
309
# File 'lib/deep-connect/session.rb', line 306

def send_class_specs
  specs_dump = Marshal.dump(Organizer::class_specs)
  send_peer_session_no_recv(:recv_class_specs, specs_dump)
end

#send_disconnectObject



246
247
248
249
250
251
# File 'lib/deep-connect/session.rb', line 246

def send_disconnect
  return unless  @status == :SERVICING

  ev = Event::SessionRequestNoReply.request(self, :recv_disconnect)
  @port.export(ev)
end

#send_peer_session(req, *args) ⇒ Object



232
233
234
235
236
237
238
239
# File 'lib/deep-connect/session.rb', line 232

def send_peer_session(req, *args)
  ev = Event::SessionRequest.request(self, (req.id2name+"_impl").intern, args)
  @waiting_mutex.synchronize do
	@waiting[ev.seq] = ev
  end
  @export_queue.push ev
  ev.result
end

#send_peer_session_no_recv(req, *args) ⇒ Object



241
242
243
244
# File 'lib/deep-connect/session.rb', line 241

def send_peer_session_no_recv(req, *args)
  ev = Event::SessionRequestNoReply.request(self, (req.id2name+"_impl").intern, args)
  @export_queue.push ev
end

#send_to(ref, method, args = [], &block) ⇒ Object

イベントの生成/送信



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/deep-connect/session.rb', line 175

def send_to(ref, method, args=[], &block)
  unless @status == :SERVICING
	DC.Raise SessionServiceStopped
  end
  if iterator?
	ev = Event::IteratorRequest.request(self, ref, method, args, block)
  else
	ev = Event::Request.request(self, ref, method, args)
  end
  @waiting_mutex.synchronize do
	@waiting[ev.seq] = ev
  end
  @export_queue.push ev
  ev.result
end

#startObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/deep-connect/session.rb', line 48

def start
  @last_keep_alive = @organizer.tick

  @status = :SERVICING
  send_class_specs

  @import_thread = Thread.start {
	loop do
	  begin
 ev = @port.import
 @last_keep_alive = @organizer.tick
	  rescue EOFError, DC::DisconnectClient
 # EOFError: クライアントが閉じていた場合
 # DisconnectClient: 通信中にクライアント接続が切れた
 Thread.start do
   @organizer.disconnect_deep_space(@deep_space, :SESSION_CLOSED)
 end
 Thread.stop
	  rescue DC::ProtocolError
 # 何らかの障害のためにプロトコルが正常じゃなくなった
	  end
	  if @status == :SERVICING
 receive(ev)
	  else
 puts "INFO: service is stoped, imported event abandoned(#{ev.inspect})" 
	  end
	end
  }

  @export_thread = Thread.start {
	loop do
	  ev = @export_queue.pop
	  if @status == :SERVICING
 begin
   # export中にexportが発生するとデッドロックになる
   # threadが欲しいか?
#	      Thread.start do
   @port.export(ev)
#	      end
 rescue Errno::EPIPE, DC::DisconnectClient
   # EPIPE: クライアントが終了している
   # DisconnectClient: 通信中にクライアント接続が切れた
   Thread.start do
		@organizer.disconnect_deep_space(@deep_space, :SESSION_CLOSED)
   end
   Thread.stop
 end
	  else
 puts "INFO: service is stoped, export event abandoned(#{ev.inspect})" 
	  end
	end
  }
  self
end

#stop(*opts) ⇒ Object



137
138
139
140
141
142
143
# File 'lib/deep-connect/session.rb', line 137

def stop(*opts)
  begin
	@port.close
  rescue IOError
	puts "WARN: #{$!}"
  end
end

#stop_service(*opts) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/deep-connect/session.rb', line 103

def stop_service(*opts)
  unless Conf.DISABLE_INFO
	puts "INFO: STOP_SERVICE: Session: #{self.peer_uuid} #{opts.join(' ')} "
  end
  org_status = @status
  @status = :SERVICE_STOP
  
  if !opts.include?(:SESSION_CLOSED)
	@port.shutdown_reading
  end

  if org_status == :SERVICING
	@import_thread.exit
	@export_thread.exit
  
	@waiting_mutex.synchronize do
	  waiting_events = @waiting.sort{|s1, s2| s1[0] <=> s2[0]}
	  for seq, ev in waiting_events
 begin
   unless Conf.DISABLE_INFO
		print "WARN: Remain events: "
		p ev
   end
   DC.Raise SessionServiceStopped
 rescue
   ev.result = ev.reply(nil, $!)
 end
	  end
	  @waiting.clear
	end
  end

end