Class: Eventr::SupervisedObject

Inherits:
Object
  • Object
show all
Defined in:
lib/eventr/actors.rb

Direct Known Subclasses

Consumer, Publisher

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#on_exceptionObject

Returns the value of attribute on_exception.



24
25
26
# File 'lib/eventr/actors.rb', line 24

def on_exception
  @on_exception
end

Instance Method Details

#applicationObject



39
40
41
# File 'lib/eventr/actors.rb', line 39

def application
  threads[:application]
end

#sleep_time_from_backoffObject



51
52
53
54
# File 'lib/eventr/actors.rb', line 51

def sleep_time_from_backoff
  backoff = Thread.current[:backoff] || 0
  (0..backoff).inject([1, 0]) { |(a, b), _| [b, a + b] }[0]
end

#startObject



30
31
32
33
# File 'lib/eventr/actors.rb', line 30

def start
  start_application_thread
  start_supervisor_thread
end

#start_application_threadObject



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/eventr/actors.rb', line 56

def start_application_thread
  threads[:application] ||= Thread.new do
    begin
      main
    rescue StandardError => e
      on_exception.call(e) if on_exception.respond_to? :call
      warn "#{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
      raise e
    ensure
      threads[:supervisor].wakeup # wakeup the supervisor to help us recover
    end
  end
end

#start_supervisor_threadObject

rubocop:disable MethodLength



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/eventr/actors.rb', line 70

def start_supervisor_thread # rubocop:disable MethodLength
  threads[:supervisor] ||= Thread.new do
    Thread.current[:backoff] = 1

    begin
      runs = 5
      loop do
        unless application && application.alive?
          puts "#{self.class.name}::Supervisor: cleaning up app thread and restarting it."
          threads[:application] = nil
          start_application_thread

          # stop when we've successfully cleaned something up
          runs = 0

          # and make sure to reset backoff
          Thread.current[:backoff] = 1
        end

        # check for required cleanup 5 times over as many seconds
        if (runs -= 1) <= 0
          Thread.stop
          runs = 5
        end

        sleep 1
      end

    rescue StandardError => e
      warn "#{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"

      if Thread.current[:backoff] <= 15
        Thread.current[:backoff] += 1
        sleep_time = sleep_time_from_backoff
        warn "sleeping for #{sleep_time} before restarting supervisor"
        sleep sleep_Time
        retry
      end

      # if the supervisor goes away, take the whole thing down.
      error_msg = "supervisor went away due to: #{e.class.name}: #{e.message} -> #{e.backtrace.first}"
      threads[:application].raise Error::SupervisorDown, error_msg

      raise e
    end
  end
end

#stopObject



26
27
28
# File 'lib/eventr/actors.rb', line 26

def stop
  threads.values.each { |t| t.send :kill }
end

#supervisorObject



43
44
45
# File 'lib/eventr/actors.rb', line 43

def supervisor
  threads[:supervisor]
end

#threadsObject



35
36
37
# File 'lib/eventr/actors.rb', line 35

def threads
  @threads ||= {}
end