Class: Dnsruby::ResolverEM
- Inherits:
-
Object
- Object
- Dnsruby::ResolverEM
- Defined in:
- lib/Dnsruby/Resolver.rb
Overview
This class implements the I/O using EventMachine. NOTE - EM does not work properly on Windows with version 0.8.1 - do not use!
Defined Under Namespace
Classes: PersistentData
Constant Summary collapse
- TIMER_PERIOD =
:nodoc: all
0.1
Instance Method Summary collapse
- #cancel_queries(persistent_data) ⇒ Object
-
#initialize(parent) ⇒ ResolverEM
constructor
A new instance of ResolverEM.
- #process_eventmachine_timers(persistent_data) ⇒ Object
- #remove_server(server, persistent_data) ⇒ Object
-
#reset_attributes ⇒ Object
:nodoc: all.
- #return_to_client(deferrable, client_queue, client_query_id, answer, error) ⇒ Object
-
#send_async(*args) ⇒ Object
msg, client_queue=nil, client_query_id=nil).
- #send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data) ⇒ Object
Constructor Details
#initialize(parent) ⇒ ResolverEM
Returns a new instance of ResolverEM.
557 558 559 |
# File 'lib/Dnsruby/Resolver.rb', line 557 def initialize(parent) @parent=parent end |
Instance Method Details
#cancel_queries(persistent_data) ⇒ Object
697 698 699 700 701 702 703 704 |
# File 'lib/Dnsruby/Resolver.rb', line 697 def cancel_queries(persistent_data) TheLog.debug("Cancelling EM queries") persistent_data.outstanding.each do |df| df.set_deferred_status :failed, "cancelling", "cancelling" end # Cancel the next tick persistent_data.finish = true end |
#process_eventmachine_timers(persistent_data) ⇒ Object
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 |
# File 'lib/Dnsruby/Resolver.rb', line 621 def process_eventmachine_timers(persistent_data) if (persistent_data.finish) return end now = Time.now persistent_data.timer_keys_sorted.each do |timeout| if (timeout > now) break end persistent_data.timer_procs[timeout].call persistent_data.timer_procs.delete(timeout) persistent_data.timer_keys_sorted.delete(timeout) end EventMachine::add_timer(TIMER_PERIOD) {process_eventmachine_timers(persistent_data)} end |
#remove_server(server, persistent_data) ⇒ Object
686 687 688 689 690 691 692 693 694 695 |
# File 'lib/Dnsruby/Resolver.rb', line 686 def remove_server(server, persistent_data) # Go through persistent_data.timeouts and check all the values for that resolver persistent_data.timeouts.each do |key, value| if (value[0] == server) # Remove the server from the list persistent_data.timer_procs.delete(key) persistent_data.timer_keys_sorted.delete(key) end end end |
#reset_attributes ⇒ Object
:nodoc: all
560 561 |
# File 'lib/Dnsruby/Resolver.rb', line 560 def reset_attributes #:nodoc: all end |
#return_to_client(deferrable, client_queue, client_query_id, answer, error) ⇒ Object
706 707 708 709 710 711 712 713 714 715 716 717 |
# File 'lib/Dnsruby/Resolver.rb', line 706 def return_to_client(deferrable, client_queue, client_query_id, answer, error) if (client_queue) client_queue.push([client_query_id, answer, error]) end # We call set_defered_status when done if (error != nil) deferrable.set_deferred_status :failed, answer, error else deferrable.set_deferred_status :succeeded, answer end EventMachineInterface::stop_em_for_resolver(self) end |
#send_async(*args) ⇒ Object
msg, client_queue=nil, client_query_id=nil)
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 |
# File 'lib/Dnsruby/Resolver.rb', line 565 def send_async(*args) #msg, client_queue=nil, client_query_id=nil) msg=args[0] client_queue=nil client_query_id=nil if (args.length>1) client_queue=args[1] if (args.length > 2) client_query_id = args[2] end end # We want to send the query to the first resolver. # We then want to set up all the timers for all of the events which might happen # (first round timers, retry timers, etc.) # The callbacks for these should be able to cancel any of the rest (including any for broken resolvers) # We can then forget about the query, as all the callbacks will be lodged with EventMachine. EventMachineInterface::start_em_for_resolver(self) persistent_data = PersistentData.new persistent_data.deferrable = EM::DefaultDeferrable.new persistent_data.outstanding = [] persistent_data.to_send = 0 persistent_data.timeouts=@parent.generate_timeouts(Time.now) persistent_data.timer_procs = {} persistent_data.finish = false persistent_data.timeouts.keys.sort.each do |timeout| value = persistent_data.timeouts[timeout] # timeout = timeout.round single_resolver, retry_count = value persistent_data.to_send+=1 df = nil if (timeout == 0) # Send immediately TheLog.debug("Sending first EM query") df = send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data) persistent_data.outstanding.push(df) else # Send later persistent_data.timer_procs[timeout]=Proc.new{ TheLog.debug("Sending #{timeout} delayed EM query") df = send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data) persistent_data.outstanding.push(df) } end end query_timeout = @parent.query_timeout if (query_timeout > 0) persistent_data.timer_procs[Time.now+query_timeout]=Proc.new{ cancel_queries(persistent_data) return_to_client(persistent_data.deferrable, client_queue, client_query_id, nil, ResolvTimeout.new("Query timed out after query_timeout=#{query_timeout.round} seconds")) } end persistent_data.timer_keys_sorted = persistent_data.timer_procs.keys.sort EventMachine::add_timer(0) {process_eventmachine_timers(persistent_data)} return persistent_data.deferrable end |
#send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data) ⇒ Object
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 |
# File 'lib/Dnsruby/Resolver.rb', line 637 def send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data) df = single_resolver.send_async(msg) # client_queue, client_query_id) persistent_data.to_send-=1 df.callback { |answer| TheLog.debug("Response returned") persistent_data.outstanding.delete(df) cancel_queries(persistent_data) return_to_client(persistent_data.deferrable, client_queue, client_query_id, answer, nil) } df.errback { |response, error| TheLog.debug("Error #{error} returned") persistent_data.outstanding.delete(df) if (response!="cancelling") if (error.kind_of?(ResolvTimeout)) # - if it was a timeout, then check which number it was, and how many retries are expected on that server # - if it was the last retry, on the last server, then return a timeout to the client (and clean up) # - otherwise, continue # Do we have any more packets to send to this resolver? if (persistent_data.outstanding.empty? && persistent_data.to_send==0) TheLog.debug("Sending timeout to client") return_to_client(persistent_data.deferrable, client_queue, client_query_id, response, error) end elsif (error.kind_of?NXDomain) # - if it was an NXDomain, then return that to the client, and stop all new queries (and clean up) TheLog.debug("NXDomain - returning to client") cancel_queries(persistent_data) return_to_client(persistent_data.deferrable, client_queue, client_query_id, response, error) else # - if it was any other error, then remove that server from the list for that query # If a Too Many Open Files error, then don't remove, but let retry work. if (!(error.to_s=~/Errno::EMFILE/)) remove_server(single_resolver, persistent_data) TheLog.debug("Removing #{single_resolver.server} from resolver list for this query") else TheLog.debug("NOT Removing #{single_resolver.server} due to Errno::EMFILE") end # - if it was the last server, then return an error to the client (and clean up) if (persistent_data.outstanding.empty? && persistent_data.to_send==0) # if (outstanding.empty?) TheLog.debug("Sending error to client") return_to_client(persistent_data.deferrable, client_queue, client_query_id, response, error) end end end } return df end |