Class: ReReplay::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/rereplay/ruby_runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input = nil) ⇒ Runner

Returns a new instance of Runner.



14
15
16
17
18
19
20
# File 'lib/rereplay/ruby_runner.rb', line 14

def initialize(input=nil)
  if(!input.nil?)
    self.input = input
  end
  @periodic_monitors = []
  @request_monitors = []
end

Instance Attribute Details

#periodic_monitorsObject

Returns the value of attribute periodic_monitors.



11
12
13
# File 'lib/rereplay/ruby_runner.rb', line 11

def periodic_monitors
  @periodic_monitors
end

#request_monitorsObject

Returns the value of attribute request_monitors.



12
13
14
# File 'lib/rereplay/ruby_runner.rb', line 12

def request_monitors
  @request_monitors
end

Instance Method Details

#input=(input) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/rereplay/ruby_runner.rb', line 22

def input=(input)
  if(input.is_a? Array)
    @input = input
  elsif(input.respond_to? :readlines)
    @input = input.readlines
  elsif(input.respond_to? :split)
    @input = input.split("\n").map do |i| 
      i = i.strip.split(",").map {|j| j.strip}
      i[0] = i[0].to_f
      i[1] = i[1].to_sym
      i
    end
  else
    raise "Invalid input, expected Array, #readlines, or #split"
  end
end

#prepareObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rereplay/ruby_runner.rb', line 61

def prepare
  p = profile

  max_time = @input.max {|a, b| a[0] <=> b[0]}[0]
  
  loop_count = 1
  if(p[:when_input_consumed] == :loop)
    if(max_time < p[:run_for])
      loop_count = (p[:run_for].to_f / max_time).ceil
    end
  end
  if(loop_count > 1)
    new_inputs = []
    2.upto(loop_count) do |loop|
      new_input = @input.map do |i|
        new_i = i.dup
        new_i[0] += max_time * (loop - 1)
        new_i
      end
      new_inputs << new_input
    end
    new_inputs.each {|input| @input += input}
  end
  real_max_time = [max_time * loop_count, p[:run_for]].min
  if(p[:rampup][0] != p[:rampup][1] || p[:rampup][0] != 1.0)
    case p[:rampup_method]
    when :linear
      sr = 1.0 / p[:rampup][0]
      fr = 1.0 / p[:rampup][1]
      prev_time = 0
      new_prev_time = 0
      @input.map! do |a|
        time = a[0].to_f
        percent = time / real_max_time
        fraction = sr + (fr - sr)*(time / real_max_time)
        tmp = a[0]
        a[0] = (time - prev_time)*fraction + new_prev_time
        prev_time = tmp
        new_prev_time = a[0]
        a
      end
    end
  end
end

#profileObject



243
244
245
246
247
248
# File 'lib/rereplay/ruby_runner.rb', line 243

def profile
  if(@profile.nil?)
    self.profile = {}
  end
  @profile
end

#profile=(new_profile) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/rereplay/ruby_runner.rb', line 231

def profile=(new_profile)
  @profile = {
    :run_for => 5,
    :when_input_consumed => :stop,
    :timeout => 1,
    :rampup => [1.0, 1.0],
    :rampup_method => :linear
  }
  if(new_profile.is_a? Hash)
    @profile.merge!(new_profile)
  end
end

#runObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/rereplay/ruby_runner.rb', line 106

def run
  validate_input
  p = profile
  done_count = 0
  # request monitors with a start method
  request_monitors_start = request_monitors.select {|mon| mon.respond_to? :start}
  # request monitors with a finish method
  request_monitors_finish = request_monitors.select {|mon| mon.respond_to? :finish}
  tg = ThreadGroup.new
  q = Queue.new
  q.extend MonitorMixin
  waiters_cond = q.new_cond
  
  prepare
  start_time = nil
  index = 0
  requests_to_make = @input.map do |r| 
    a = r.dup
    a[3] = index
    index += 1
    a
  end
  thread_count = 20
  done = 0
  total_requests = @input.length
  max_delay = 0
  parent_thread = Thread.current
  Thread.abort_on_exception = true
  ready_for_processing = false
  gatekeeper = Thread.new do
    q.synchronize do
      waiters_cond.wait_until { ready_for_processing }
    end
    until requests_to_make.empty?
      task = requests_to_make.shift
      since_start = Time.new - start_time
      time_until_next_task = task[0] - since_start
    
      if(time_until_next_task > 0)
        sleep time_until_next_task
      end
      
      q << task
    end
  end
  
  thread_count.times do
    t = Thread.new do
      while true
        task = q.pop
        now = Time.new
        since_start = now - start_time
        delay = since_start - task[0]
        if(delay > max_delay) then max_delay = delay; end
        url = URI.parse(task[2])
        req = Net::HTTP::Get.new(url.path)
        request = OpenStruct.new(:url => task[2], :scheduled_start => task[0], :index => task[3], :http_method => task[1])
        # this connection can actually take ~300ms...is there a better way?
        Net::HTTP.start(url.host, url.port) do |http|
          http.read_timeout = p[:timeout]
          status = nil
          begin
            #request.actual_start = Time.now - start_time
            request.actual_start = now - start_time
            resp = http.request(req)
            request_monitors_start.each {|mon| mon.start(request)}
          rescue Timeout::Error
            status = :timeout
          end
          if status.nil?
            status = resp.code
          end
          time_finished = Time.now - start_time
          request.finish = time_finished
          request.status = status
          begin
            request_monitors_finish.each {|mon| mon.finish(request)}
          rescue => e
            parent_thread.raise e
          end
          q.synchronize do
            done += 1
            waiters_cond.broadcast
          end
        end
      end
    end
    tg.add t
  end
  test_duration_exceeded = false
  q.synchronize do
    ready_for_processing = true
    start_time = Time.now
    waiters_cond.broadcast
  end
  timeout_thread = Thread.new do
    sleep_duration = start_time + p[:run_for] - Time.now
    sleep sleep_duration
    q.synchronize do
      test_duration_exceeded = true
      waiters_cond.broadcast
    end
  end
  periodic_monitor_threads = []
  periodic_monitors.each do |mon|
    interval = mon.respond_to?(:interval) ? mon.interval : 5
    periodic_monitor_threads << Thread.new do
      i = 0
      while true
        mon.tick(Time.now - start_time)
        i += 1
        time_to_next = start_time + (interval * i) - Time.now
        sleep time_to_next if time_to_next > 0
      end
    end
  end
  q.synchronize do
    waiters_cond.wait_while { done < total_requests && !test_duration_exceeded }
  end
ensure
  gatekeeper.kill if gatekeeper
  tg.list.each {|t| t.kill} if tg
  periodic_monitor_threads.each {|t| t.kill} if periodic_monitor_threads
end

#validate_inputObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rereplay/ruby_runner.rb', line 39

def validate_input
  if(@input.nil? || @input.empty?)
    raise ArgumentError, "Nothing to process (input was empty)"
  end
  valid_methods = [:get, :head]
  @input.each_with_index do |a, i|
    if(!a[0].is_a? Numeric)
      raise ArgumentError, "Expected element at index 0 of input #{i+1} to be Numeric; was #{a[0]}"
    end
    if(!a[1].is_a?(Symbol) || !valid_methods.include?(a[1]))
      raise ArgumentError, "Expected element at index 1 of input #{i+1} to be a symbol in #{valid_methods.inspect}; was #{a[1].inspect}"
    end
    if(!a[2].is_a? String)
      raise ArgumentError, "Expected element at index 2 of input #{i+1} to be a String; was #{a[2]}"
    end
    if(!a[3].nil? && !a[3].is_a?(Hash))
      raise ArgumentError, "Expected element at index 3 of input #{i+1} to be nil or a Hash; was #{a[3]}"
    end
    # TODO post data
  end
end