Class: Streamworker::Workers::Worker

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/streamworker/workers/worker.rb

Direct Known Subclasses

ShopifyWorker

Constant Summary collapse

QUERIES_PER_BLOCK =
500
TIME_PER_BLOCK =
300
TIMEOUT_MARGIN =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(view_context, opts = {}) ⇒ Worker

Returns a new instance of Worker.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/streamworker/workers/worker.rb', line 17

def initialize(view_context, opts={})
  @opts = opts.with_indifferent_access
  @view_context = view_context

  @title = "Working..."
  @repeats = opts[:repeats] || 1
  @repeats = @repeats.to_i
  @fragment = false
  @started_at = Time.now

  if defined?(AppConfig)
    @opts[:unicorn_timeout] ||= AppConfig.unicorn_timeout
  end

  @opts[:unicorn_timeout] ||= ENV['UNICORN_TIMEOUT']
  @opts[:unicorn_timeout] ||= 30
  @opts[:unicorn_timeout] = @opts[:unicorn_timeout].to_i
  @num_records = opts[:num_records].to_i || 1
  @num_success = 0
  @num_errors = 0

  @footer_messages = []
end

Instance Attribute Details

Returns the value of attribute footer_messages.



15
16
17
# File 'lib/streamworker/workers/worker.rb', line 15

def footer_messages
  @footer_messages
end

#line_numObject

subclasses responsible for setting this as appropriate



12
13
14
# File 'lib/streamworker/workers/worker.rb', line 12

def line_num
  @line_num
end

#num_errorsObject

Returns the value of attribute num_errors.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_errors
  @num_errors
end

#num_recordsObject

Returns the value of attribute num_records.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_records
  @num_records
end

#num_successObject

Returns the value of attribute num_success.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_success
  @num_success
end

#optsObject

Returns the value of attribute opts.



10
11
12
# File 'lib/streamworker/workers/worker.rb', line 10

def opts
  @opts
end

#repeatsObject

Returns the value of attribute repeats.



11
12
13
# File 'lib/streamworker/workers/worker.rb', line 11

def repeats
  @repeats
end

#titleObject

Returns the value of attribute title.



13
14
15
# File 'lib/streamworker/workers/worker.rb', line 13

def title
  @title
end

#view_contextObject

Returns the value of attribute view_context.



10
11
12
# File 'lib/streamworker/workers/worker.rb', line 10

def view_context
  @view_context
end

Instance Method Details

#calculate_timesObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/streamworker/workers/worker.rb', line 54

def calculate_times
  actual_time_used = Time.now - @started_at
  work_time_remaining = opts[:unicorn_timeout] - actual_time_used
  theoretical_total_time = (projected_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK
  theoretical_time_used = (num_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK
  factor = actual_time_used.to_f / theoretical_time_used
  factor = [factor, 1].max if projected_queries > QUERIES_PER_BLOCK
  total_time = theoretical_total_time * factor

  # puts "--------- calculate_times ---------"
  # puts "Time.now: #{Time.now.inspect}"
  # puts "@started_at: #{@started_at.inspect}"
  # puts "QUERIES_PER_BLOCK: #{QUERIES_PER_BLOCK.inspect}"
  # puts "TIME_PER_BLOCK: #{TIME_PER_BLOCK.inspect}"
  # puts "(self.num_records * self.queries_per_record): #{(self.num_records * self.queries_per_record).inspect}"
  # puts "opts[:unicorn_timeout] : #{opts[:unicorn_timeout] .inspect}"
  # puts "actual_time_used: #{actual_time_used.inspect}"
  # puts "work_time_remaining: #{work_time_remaining.inspect}"
  # puts "theoretical_total_time: #{theoretical_total_time.inspect}"
  # puts "theoretical_time_used: #{theoretical_time_used.inspect}"
  # puts "factor: #{factor.inspect}"
  # puts "total_time: #{total_time.inspect}"
  # puts "(total_time - actual_time_used): #{(total_time - actual_time_used).inspect}"
  # puts
  {
    work_time: opts[:unicorn_timeout] .to_i,
    work_time_remaining: work_time_remaining,
    time_used: actual_time_used,
    time_remaining: (total_time - actual_time_used),
    total_time: total_time
  }
end

#close_report_lineObject



198
199
200
# File 'lib/streamworker/workers/worker.rb', line 198

def close_report_line
  fragment? ? report_line("", close: true) : ""
end

#eachObject



243
244
245
# File 'lib/streamworker/workers/worker.rb', line 243

def each
  raise "Worker subclasses must implement each to yield their output"
end

#error_line_numObject



239
240
241
# File 'lib/streamworker/workers/worker.rb', line 239

def error_line_num
  %Q{<span class="badge badge-important badge-line-num">#{line_num}</span>}
end

#footObject



171
172
173
174
175
176
# File 'lib/streamworker/workers/worker.rb', line 171

def foot
  %Q{
</body>
    </html>
}
end


157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/streamworker/workers/worker.rb', line 157

def footer(msg=nil)
  @footer_messages << msg unless msg.blank?
  message = @footer_messages.empty? ? "" : "<h3>"
  message << @footer_messages.join('</h3><h3>')
  message += "</h3>" unless message.blank?
  <<-EOHTML
    </div>
    #{message}
    #{scroll}

  </div>#{self.foot}
  EOHTML
end

#fragment?Boolean

Returns:

  • (Boolean)


190
191
192
# File 'lib/streamworker/workers/worker.rb', line 190

def fragment?
  @fragment
end

#head(scroll = true) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/streamworker/workers/worker.rb', line 138

def head(scroll=true)
  scroll = scroll ? "" : %Q{ style="overflow: hidden;"}
  %Q{
    <!DOCTYPE html>
    <html class="white"#{scroll}>
<head>
  #{view_context.stylesheet_link_tag('application')}
  #{view_context.javascript_include_tag('application')}
  #{view_context.javascript_include_tag('scroller')}
  <title>#{self.title}</title>
</head>
<body class="stream-worker-results">      
  }
end

#headerObject



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/streamworker/workers/worker.rb', line 123

def header
  repeats =  ""
  repeats = %Q{<p class="muted">Repeating #{@repeats} times</p>} if @repeats > 1 

  header = <<-EOHTML
  #{self.head}<div class="container">
      #{repeats}
      <div class="import-results">
  EOHTML
  # Safari waits until it gets the first 1024 bytes to start displaying
  Rails.logger.debug header

  header + (" " * [0, (1025 - header.length)].max) 
end

#imminent_timeout?Boolean

Returns:

  • (Boolean)


87
88
89
90
91
92
# File 'lib/streamworker/workers/worker.rb', line 87

def imminent_timeout?
  # puts "--------- imminent_timeout ---------"
  # puts "work_time_remaining: #{calculate_times[:work_time_remaining].inspect}"
  # puts "TIMEOUT_MARGIN: #{TIMEOUT_MARGIN.inspect}"
  calculate_times[:work_time_remaining] < TIMEOUT_MARGIN
end

#num_queriesObject



50
51
52
# File 'lib/streamworker/workers/worker.rb', line 50

def num_queries
  (self.num_success + self.num_errors) .to_f * self.queries_per_record
end

#open_report_line(str) ⇒ Object



186
187
188
# File 'lib/streamworker/workers/worker.rb', line 186

def open_report_line(str)
  report_line(str, close: false)
end

#projected_queriesObject



46
47
48
# File 'lib/streamworker/workers/worker.rb', line 46

def projected_queries
  (self.num_records * self.queries_per_record).to_f
end


153
154
155
# File 'lib/streamworker/workers/worker.rb', line 153

def push_footer_message(msg)
  @footer_messages << msg
end

#queries_per_recordObject



42
43
44
# File 'lib/streamworker/workers/worker.rb', line 42

def queries_per_record
  1
end

#report_error(str, list = []) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/streamworker/workers/worker.rb', line 221

def report_error(str, list=[])
  err = %Q{
      #{error_line_num}
      <div class="alert alert-error">
          <p class="text-error"><i class="icon-warning-sign icon-large"></i>#{str}</p>
        }
  err << %Q{<ul class="error-list">\n} unless list.empty?
  list.each { |e| err << %Q{              <li>#{e}</li>\n} }
  err << %Q{              </ul>\n} unless list.empty?
  err << %Q{           </div>}
  
  err
end

#report_fragment(str) ⇒ Object



194
195
196
# File 'lib/streamworker/workers/worker.rb', line 194

def report_fragment(str)
  fragment? ? str : open_report_line(str)
end

#report_line(str, opts = {}) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/streamworker/workers/worker.rb', line 202

def report_line(str, opts={})
  # Rails.logger.info("report_line str: #{str.inspect} opts: #{opts.inspect} fragment?: #{fragment?.inspect}")
  opts = {close: true}.merge(opts)
  p_class = ["report-line", opts[:class]].compact.join(" ")
  start = fragment? ?  "" : %Q{
      <p class="#{p_class}">}
  @fragment = ! opts[:close]
  close = ""
  if opts[:close]
    close = %Q{</p>
        #{scroll}
    } 
  end
  out = %Q{#{start}#{str}#{close}}
  # Rails.logger.info("    out: #{out.inspect}")
  out
end


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/streamworker/workers/worker.rb', line 94

def report_timeout_footer(msg={})
  msg[:work_desc] ||= "#{num_success} records"
  msg[:how_to_finish] ||= "resubmit the last #{num_records - num_success} records."
  times = calculate_times
  %Q{ 
    </div>
    <hr/>
    <div class="alert alert-error alert_block span8">
      
      <h4><i class="icon-time icon-large pull-left"></i>Server Timeout!</h4>
      <br/>
        Unfortunately, the backend processing time is limited to #{times[:work_time]} seconds, so we have to stop processing this job after #{msg[:work_desc]}.
      
      <br/><br/>
        To finish processing, please #{msg[:how_to_finish]}.
      
    </div>
    #{scroll}
  </div>#{self.foot}
  }
end

#scrollObject



178
179
180
181
182
183
184
# File 'lib/streamworker/workers/worker.rb', line 178

def scroll
  %Q{<script type="text/javascript">
        scrollBottom();
        parent.update_stream_worker_progress(#{num_records}, #{num_success}, #{num_errors});
      </script>
  }
end

#set_headers(response) ⇒ Object



116
117
118
119
120
121
# File 'lib/streamworker/workers/worker.rb', line 116

def set_headers(response)
  response.headers['Last-Modified'] = Time.now.ctime.to_s
  response.headers.delete('Content-Length')
  response.headers['Cache-Control'] = 'no-cache'
  response.headers['Transfer-Encoding'] = 'chunked'
end

#success_line_numObject



235
236
237
# File 'lib/streamworker/workers/worker.rb', line 235

def success_line_num
  %Q{<span class="badge badge-success badge-line-num">#{line_num}</span>}
end