Class: Dnsruby::ResolverEM

Inherits:
Object
  • Object
show all
Defined in:
lib/Dnsruby/Resolver.rb

Overview

This class implements the I/O using EventMachine. NOTE - EM does not work properly on Windows with version 0.8.1 - do not use!

Defined Under Namespace

Classes: PersistentData

Constant Summary collapse

TIMER_PERIOD =

:nodoc: all

0.1

Instance Method Summary collapse

Constructor Details

#initialize(parent) ⇒ ResolverEM

Returns a new instance of ResolverEM.



557
558
559
# File 'lib/Dnsruby/Resolver.rb', line 557

def initialize(parent)
  @parent=parent
end

Instance Method Details

#cancel_queries(persistent_data) ⇒ Object



697
698
699
700
701
702
703
704
# File 'lib/Dnsruby/Resolver.rb', line 697

def cancel_queries(persistent_data)
  TheLog.debug("Cancelling EM queries")
  persistent_data.outstanding.each do |df|
    df.set_deferred_status :failed, "cancelling", "cancelling"
  end
  # Cancel the next tick

  persistent_data.finish = true
end

#process_eventmachine_timers(persistent_data) ⇒ Object



621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
# File 'lib/Dnsruby/Resolver.rb', line 621

def process_eventmachine_timers(persistent_data)
  if (persistent_data.finish)
    return
  end
  now = Time.now
  persistent_data.timer_keys_sorted.each do |timeout|
    if (timeout > now)
      break
    end
    persistent_data.timer_procs[timeout].call
    persistent_data.timer_procs.delete(timeout)
    persistent_data.timer_keys_sorted.delete(timeout)
  end
  EventMachine::add_timer(TIMER_PERIOD) {process_eventmachine_timers(persistent_data)}
end

#remove_server(server, persistent_data) ⇒ Object



686
687
688
689
690
691
692
693
694
695
# File 'lib/Dnsruby/Resolver.rb', line 686

def remove_server(server, persistent_data)
  # Go through persistent_data.timeouts and check all the values for that resolver

  persistent_data.timeouts.each do |key, value|
    if (value[0] == server)
      # Remove the server from the list

      persistent_data.timer_procs.delete(key)
      persistent_data.timer_keys_sorted.delete(key)
    end
  end      
end

#reset_attributesObject

:nodoc: all



560
561
# File 'lib/Dnsruby/Resolver.rb', line 560

def reset_attributes #:nodoc: all

end

#return_to_client(deferrable, client_queue, client_query_id, answer, error) ⇒ Object



706
707
708
709
710
711
712
713
714
715
716
717
# File 'lib/Dnsruby/Resolver.rb', line 706

def return_to_client(deferrable, client_queue, client_query_id, answer, error)
  if (client_queue)
    client_queue.push([client_query_id, answer, error])
  end
  #  We call set_defered_status when done

  if (error != nil)
    deferrable.set_deferred_status :failed, answer, error
  else
    deferrable.set_deferred_status :succeeded, answer
  end
  EventMachineInterface::stop_em_for_resolver(self)
end

#send_async(*args) ⇒ Object

msg, client_queue=nil, client_query_id=nil)



565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
# File 'lib/Dnsruby/Resolver.rb', line 565

def send_async(*args) #msg, client_queue=nil, client_query_id=nil)

  msg=args[0]
  client_queue=nil
  client_query_id=nil
  if (args.length>1)
    client_queue=args[1]
    if (args.length > 2)
      client_query_id = args[2]
    end
  end
  # We want to send the query to the first resolver.

  # We then want to set up all the timers for all of the events which might happen

  #   (first round timers, retry timers, etc.)

  # The callbacks for these should be able to cancel any of the rest (including any for broken resolvers)

  # We can then forget about the query, as all the callbacks will be lodged with EventMachine.

  
  EventMachineInterface::start_em_for_resolver(self)
  persistent_data = PersistentData.new
  persistent_data.deferrable = EM::DefaultDeferrable.new
  persistent_data.outstanding = []
  persistent_data.to_send = 0
  persistent_data.timeouts=@parent.generate_timeouts(Time.now)
  persistent_data.timer_procs = {}
  persistent_data.finish = false
  persistent_data.timeouts.keys.sort.each do |timeout|
    value = persistent_data.timeouts[timeout]
    #        timeout = timeout.round

    single_resolver, retry_count = value
    persistent_data.to_send+=1
    df = nil
    if (timeout == 0) 
      # Send immediately

      TheLog.debug("Sending first EM query")
      df = send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data)
      persistent_data.outstanding.push(df)
    else
      # Send later

      persistent_data.timer_procs[timeout]=Proc.new{
        TheLog.debug("Sending #{timeout} delayed EM query")
        df = send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data)
        persistent_data.outstanding.push(df)
      }
    end
  end
  query_timeout = @parent.query_timeout
  if (query_timeout > 0)
    persistent_data.timer_procs[Time.now+query_timeout]=Proc.new{
      cancel_queries(persistent_data)
      return_to_client(persistent_data.deferrable, client_queue, client_query_id, nil, ResolvTimeout.new("Query timed out after query_timeout=#{query_timeout.round} seconds"))
    }
  end
  persistent_data.timer_keys_sorted = persistent_data.timer_procs.keys.sort
  EventMachine::add_timer(0) {process_eventmachine_timers(persistent_data)}
  return persistent_data.deferrable
end

#send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data) ⇒ Object



637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
# File 'lib/Dnsruby/Resolver.rb', line 637

def send_new_em_query(single_resolver, msg, client_queue, client_query_id, persistent_data)
  df = single_resolver.send_async(msg) # client_queue, client_query_id)

  persistent_data.to_send-=1
  df.callback { |answer|
    TheLog.debug("Response returned")
    persistent_data.outstanding.delete(df)
    cancel_queries(persistent_data)
    return_to_client(persistent_data.deferrable, client_queue, client_query_id, answer, nil)
  }  
  df.errback { |response, error|
    TheLog.debug("Error #{error} returned")
    persistent_data.outstanding.delete(df)
    if (response!="cancelling")

      if (error.kind_of?(ResolvTimeout))
        #   - if it was a timeout, then check which number it was, and how many retries are expected on that server

        #       - if it was the last retry, on the last server, then return a timeout to the client (and clean up)

        #       - otherwise, continue

        # Do we have any more packets to send to this resolver?

        if (persistent_data.outstanding.empty? && persistent_data.to_send==0)
          TheLog.debug("Sending timeout to client")
          return_to_client(persistent_data.deferrable, client_queue, client_query_id, response, error)
        end
      elsif (error.kind_of?NXDomain)
        #   - if it was an NXDomain, then return that to the client, and stop all new queries (and clean up)

        TheLog.debug("NXDomain - returning to client")
        cancel_queries(persistent_data)
        return_to_client(persistent_data.deferrable, client_queue, client_query_id, response, error)
      else
        #   - if it was any other error, then remove that server from the list for that query

        #   If a Too Many Open Files error, then don't remove, but let retry work.

        if (!(error.to_s=~/Errno::EMFILE/))
          remove_server(single_resolver, persistent_data)
          TheLog.debug("Removing #{single_resolver.server} from resolver list for this query")
        else
          TheLog.debug("NOT Removing #{single_resolver.server} due to Errno::EMFILE")          
        end
        #        - if it was the last server, then return an error to the client (and clean up)

        if (persistent_data.outstanding.empty? && persistent_data.to_send==0)
          #          if (outstanding.empty?)

          TheLog.debug("Sending error to client")
          return_to_client(persistent_data.deferrable, client_queue, client_query_id, response, error)
        end
      end
    end
  }  
  return df
end