Class: HadoopDsl::LogAnalysis::LogAnalysisMapper
Overview
Constant Summary
collapse
- @@reg_cache =
{}
Instance Attribute Summary
Attributes inherited from BaseMapRed
#emitted
Instance Method Summary
collapse
Methods inherited from BaseMapper
#identity
Methods inherited from BaseMapRed
#emit
#pre_process, #run
Methods included from DslElement
#method_missing
Constructor Details
Returns a new instance of LogAnalysisMapper.
14
15
16
|
# File 'lib/log_analysis.rb', line 14
def initialize(script, key, value)
super(script, LogAnalysisMapperModel.new(key, value))
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
in the class HadoopDsl::DslElement
Instance Method Details
#column_name(*names) ⇒ Object
column names by String converted to Symbol
46
47
48
49
|
# File 'lib/log_analysis.rb', line 46
def column_name(*names)
sym_names = names.map {|name| name.is_a?(String) ? name.to_sym : name }
@model.create_or_replace_columns_with(sym_names) {|column, name| column.name = name}
end
|
#count_uniq(column_or_value) ⇒ Object
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/log_analysis.rb', line 74
def count_uniq(column_or_value)
uniq_key =
case column_or_value
when LogAnalysisMapperModel::Column
column = column_or_value
column.value
else column_or_value end
current_topic.key_elements << uniq_key
emit(current_topic.key => 1)
end
|
#group_by(column_or_value) ⇒ Object
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/log_analysis.rb', line 51
def group_by(column_or_value)
case column_or_value
when LogAnalysisMapperModel::Column
column = column_or_value
current_topic.key_elements << column.value
else
value = column_or_value
current_topic.key_elements << value
end
end
|
#group_date_by(column, term) ⇒ Object
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/log_analysis.rb', line 62
def group_date_by(column, term)
require 'time'
time = parse_time(column.value)
time_key = case term
when :daily then time.strftime('%Y%m%d')
when :monthly then time.strftime('%Y%m')
when :yearly then time.strftime('%Y')
end
current_topic.key_elements << time_key
end
|
#pattern(reg_str) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/log_analysis.rb', line 32
def pattern(reg_str)
cached = @@reg_cache[reg_str]
re = cached ? @@reg_cache[reg_str] : Regexp.new(reg_str)
@@reg_cache[reg_str] ||= re
if value =~ re
md = Regexp.last_match
@model.create_or_replace_columns_with(md.captures) {|column, value| column.value = value}
else throw :each_line end
end
|
#separate(sep) ⇒ Object
27
28
29
30
|
# File 'lib/log_analysis.rb', line 27
def separate(sep)
parts = value.split(sep)
@model.create_or_replace_columns_with(parts) {|column, value| column.value = value}
end
|
#sum(column) ⇒ Object
86
87
88
|
# File 'lib/log_analysis.rb', line 86
def sum(column)
emit(current_topic.key => column.value.to_i)
end
|
#topic(desc, options = {}, &block) ⇒ Object
21
22
23
24
25
|
# File 'lib/log_analysis.rb', line 21
def topic(desc, options = {}, &block)
@model.create_topic(desc, options)
yield if block_given?
current_topic
end
|