Class: Mqlight::UnsecureEndPoint
- Inherits:
-
Object
- Object
- Mqlight::UnsecureEndPoint
- Includes:
- Logging
- Defined in:
- lib/mqlight/connection.rb
Direct Known Subclasses
Instance Method Summary collapse
- #incoming_thread ⇒ Object
-
#initialize(args) ⇒ UnsecureEndPoint
constructor
A new instance of UnsecureEndPoint.
- #outgoing_thread ⇒ Object
- #read_socket ⇒ Object
- #retrying? ⇒ Boolean
- #start_connection_threads ⇒ Object
-
#stop_threads ⇒ Object
Stop the IO threads.
- #stopped? ⇒ Boolean
- #stopping? ⇒ Boolean
Methods included from Logging
Constructor Details
#initialize(args) ⇒ UnsecureEndPoint
Returns a new instance of UnsecureEndPoint.
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/mqlight/connection.rb', line 291 def initialize(args) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } @thread_vars = args[:thread_vars] @proton = @thread_vars.proton @service = @thread_vars.service hostname = @service.host port = @service.port begin @tcp_transport = TCPSocket.open(hostname, port) @proton.sockets_open = true rescue => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise Mqlight::NetworkError, e.to_s end @transport = @tcp_transport logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } end |
Instance Method Details
#incoming_thread ⇒ Object
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
# File 'lib/mqlight/connection.rb', line 355 def incoming_thread logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } until stopped? or not @proton.sockets_open? begin logger.often(@id, 'Waiting for incoming message') do self.class.to_s + '#' + __method__.to_s end msg = read_socket break if stopped? logger.often(@id, 'New incoming message size=' + msg.size.to_s) do self.class.to_s + '#' + __method__.to_s end unless msg.nil? # TODO: a null length message is treated as stream close. # TODO: but could the server not send a blank message? # TODO: get a way to detect remote end disconnect. break if msg.nil? || msg.size == 0 until msg.nil? || msg.size == 0 n = @proton.proton_push(msg) if n == -2 msg = nil elsif n <= 0 # Busy - try later. sleep(0.2) elsif n < msg.size # trim the message msg = msg[n..msg.length] else # Delivered msg = nil end end rescue Errno::ECONNRESET, EOFError => e logger.data(@id, 'Connection remotely terminated') do self.class.to_s + '#' + __method__.to_s end @proton.sockets_open = false unless stopped? || stopping? # A race condition can occur here as this error # could be processed before the preceeding CLOSE # message is read. This then results in a Retry # So ... delaying the report to ensure any CLOSE # message is processed first sleep 0.5 ne = Mqlight::NetworkError.new( 'Connection remotely terminated [' + e.to_s + ']') @thread_vars.change_state(:retrying, ne) end break rescue => e logger.data(@id, "Exception: #{e}") do self.class.to_s + '#' + __method__.to_s end logger.ffdc(self.class.to_s + '#' + __method__.to_s, 'ffdc002', self, 'Uncaught exception', e) end end @proton.sockets_open = false logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } end |
#outgoing_thread ⇒ Object
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/mqlight/connection.rb', line 432 def outgoing_thread logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } deliver = @thread_vars.proton.(@service) until stopped? or not @proton.sockets_open? begin logger.often(@id, 'Waiting for outgoing message') do self.class.to_s + '#' + __method__.to_s end msg = deliver.get if msg.nil? # Wait a little and wake-up proton in case a heart beat is required. deliver.empty_pop sleep(0.01) else logger.often(@id, 'Outgoing message size=' + msg.size.to_s) do self.class.to_s + '#' + __method__.to_s end @transport.write(msg) @transport.flush end rescue Errno::EPIPE => e logger.data(@id, 'Connection remotely terminated') do self.class.to_s + '#' + __method__.to_s end @proton.sockets_open = false # A race condition can occur here as this error # could be processed before the preceeding CLOSE # message is read. This then results in a Retry # So ... delaying the report to ensure any CLOSE # message is processed first sleep 0.5 ne = Mqlight::NetworkError.new( 'Connection remotely terminated [' + e.to_s + ']') @thread_vars.change_state(:retrying, ne) rescue => e logger.ffdc(self.class.to_s + '#' + __method__.to_s, 'ffdc003', self, 'Uncaught exception', e) end end logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue => e logger.ffdc(self.class.to_s + '#' + __method__.to_s, 'ffdc004', self, 'Uncaught exception', e) end |
#read_socket ⇒ Object
425 426 427 |
# File 'lib/mqlight/connection.rb', line 425 def read_socket @transport.recv(1024) end |
#retrying? ⇒ Boolean
496 497 498 |
# File 'lib/mqlight/connection.rb', line 496 def @thread_vars.state == :retrying end |
#start_connection_threads ⇒ Object
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/mqlight/connection.rb', line 319 def start_connection_threads logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } @incoming = Thread.new do incoming_thread end @outgoing = Thread.new do outgoing_thread end # Time for the threads to start sleep(0.1) logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } end |
#stop_threads ⇒ Object
Stop the IO threads
338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/mqlight/connection.rb', line 338 def stop_threads logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } begin @transport.shutdown(:WR) rescue => e logger.data(@id, 'Ignored: shutdown error ' + e.to_s) do self.class.to_s + '#' + __method__.to_s end end @incoming.kill @outgoing.kill logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } end |
#stopped? ⇒ Boolean
482 483 484 |
# File 'lib/mqlight/connection.rb', line 482 def stopped? @thread_vars.state == :stopped end |
#stopping? ⇒ Boolean
489 490 491 |
# File 'lib/mqlight/connection.rb', line 489 def stopping? @thread_vars.state == :stopping end |