Module: DataCollector::Core
- Included in:
- Runner
- Defined in:
- lib/data_collector/core.rb
Class Method Summary collapse
- .config ⇒ Object
- .error(message) ⇒ Object
-
.filter(data, filter_path) ⇒ Object
evaluator jsonpath.com/ uitleg goessner.net/articles/JsonPath/index.html.
-
.input ⇒ Object
Read input from an URI example: input.from_uri(“www.libis.be”) input.from_uri(“file://hello.txt”).
- .log(message) ⇒ Object
- .logger(*destinations) ⇒ Object
- .output ⇒ Object
-
.pipeline ⇒ Object
Pipeline for your data pipeline example: pipeline.on_message do |input, output| ** processing logic here ** end.
-
.rules ⇒ Object
You can apply rules to input A rule is made up of a Hash the key is the map key field its value is a Hash with a JSONPath filter and options to apply a convert method on the filtered results.
-
.rules_ng ⇒ Object
New rules runner.
Class Method Details
.config ⇒ Object
123 124 125 |
# File 'lib/data_collector/core.rb', line 123 def config @config ||= ConfigFile end |
.error(message) ⇒ Object
134 135 136 137 |
# File 'lib/data_collector/core.rb', line 134 def error() @logger ||= self.logger @logger.error() end |
.filter(data, filter_path) ⇒ Object
evaluator jsonpath.com/ uitleg goessner.net/articles/JsonPath/index.html
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/data_collector/core.rb', line 104 def filter(data, filter_path) filtered = [] if filter_path.is_a?(Array) && data.is_a?(Array) filtered = data.map {|m| m.select {|k, v| filter_path.include?(k.to_sym)}} elsif filter_path.is_a?(String) filtered = JsonPath.on(data, filter_path) end filtered = [filtered] unless filtered.is_a?(Array) filtered = filtered.first if filtered.length == 1 && filtered.first.is_a?(Array) filtered rescue StandardError => e @logger ||= self.logger @logger.error("#{filter_path} failed: #{e.}") [] end |
.input ⇒ Object
Read input from an URI example: input.from_uri(“www.libis.be”)
input.from_uri("file://hello.txt")
24 25 26 |
# File 'lib/data_collector/core.rb', line 24 def input @input ||= DataCollector::Input.new end |
.log(message) ⇒ Object
128 129 130 131 |
# File 'lib/data_collector/core.rb', line 128 def log() @logger ||= self.logger @logger.info() end |
.logger(*destinations) ⇒ Object
140 141 142 143 144 145 146 147 148 |
# File 'lib/data_collector/core.rb', line 140 def logger(*destinations) @logger ||= begin destinations = STDOUT if destinations.nil? || destinations.empty? Logger.new(ProxyLogger.new(destinations)) rescue StandardError => e puts "Unable to instantiate ProxyLogger: #{e.}" Logger.new(STDOUT) end end |
.output ⇒ Object
Output is an object you can store data that needs to be written to an output stream output = ‘John’ output = ‘Doe’
Write output to a file, string use an ERB file as a template example: test.erb
<names>
<combined><%= data[:name] %> <%= data[:last_name] %></combined>
<%= print data, :name, :first_name %>
<%= print data, :last_name %>
</names>
will produce
<names>
<combined>John Doe</combined>
<first_name>John</first_name>
<last_name>Doe</last_name>
</names>
Into a variable result = output.to_s(“test.erb”) Into a file stored in records dir output.to_file(“test.erb”) Into a tar file stored in data output.to_file(“test.erb”, “my_data.tar.gz”) Into a temp directory output.to_tmp_file(“test.erb”,“directory”)
57 58 59 |
# File 'lib/data_collector/core.rb', line 57 def output @output ||= Output.new end |
.pipeline ⇒ Object
Pipeline for your data pipeline example: pipeline.on_message do |input, output|
** processing logic here **
end
17 18 19 |
# File 'lib/data_collector/core.rb', line 17 def pipeline @input ||= DataCollector::Pipeline.new end |
.rules ⇒ Object
You can apply rules to input A rule is made up of a Hash the key is the map key field its value is a Hash with a JSONPath filter and options to apply a convert method on the filtered results.
available convert methods are: time, map, each, call, suffix
- time: Parses a given time/date string into a Time object
- map: applies a mapping to a filter
- suffix: adds a suffix to a result
- call: executes a lambda on the filter
- each: runs a lambda on each row of a filter
example: my_rules =
'identifier' => {"filter" => '$..id',
'language' => => '$..lang',
'options' => {'convert' => 'map',
'map' => {'nl' => 'dut', 'fr' => 'fre', 'de' => 'ger', 'en' => 'eng'
}
},
'subject' => => '$..keywords',
options' => {'convert' => 'each',
'lambda' => lambda {|d| d.split(',')
}
},
'creationdate' => => '$..published_date', 'convert' => 'time'
} rules.run(my_rules, input, output)
89 90 91 92 93 |
# File 'lib/data_collector/core.rb', line 89 def rules #DataCollector::Core.log('RULES depricated using RULESNG') #rules_ng @rules ||= Rules.new end |