Class: Fluent::Plugin::TwitterInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::TwitterInput
- Defined in:
- lib/fluent/plugin/in_twitter.rb
Constant Summary collapse
- TIMELINE_TYPE =
%i(userstream sampling location tracking)
- OUTPUT_FORMAT_TYPE =
%i(nest flat simple)
Instance Method Summary collapse
- #array_key_to_s(array) ⇒ Object
- #configure(conf) ⇒ Object
- #get_message(tweet) ⇒ Object
- #handle_object(object) ⇒ Object
- #hash_flatten(record, prefix = nil) ⇒ Object
- #hash_key_to_s(hash) ⇒ Object
-
#initialize ⇒ TwitterInput
constructor
A new instance of TwitterInput.
- #is_message?(tweet) ⇒ Boolean
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ TwitterInput
Returns a new instance of TwitterInput.
35 36 37 38 |
# File 'lib/fluent/plugin/in_twitter.rb', line 35 def initialize super @running = false end |
Instance Method Details
#array_key_to_s(array) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/fluent/plugin/in_twitter.rb', line 154 def array_key_to_s(array) array.map do |v| if v.instance_of?(Hash) hash_key_to_s(v) elsif v.instance_of?(Array) array_key_to_s(v) elsif v.instance_of?(String) v.scrub('') else v end end end |
#configure(conf) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/in_twitter.rb', line 40 def configure(conf) super @keyword = @keyword.gsub('${hashtag}', '#') unless @keyword.nil? @client = Twitter::Streaming::Client.new do |config| config.consumer_key = @consumer_key config.consumer_secret = @consumer_secret config.access_token = @access_token config.access_token_secret = @access_token_secret config.proxy = @proxy.to_h if @proxy end end |
#get_message(tweet) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/fluent/plugin/in_twitter.rb', line 104 def (tweet) case @output_format when :nest record = hash_key_to_s(tweet.to_h) when :flat record = hash_flatten(tweet.to_h) when :simple record = {} record['message'] = tweet.text.scrub('') record['geo'] = tweet.geo record['place'] = tweet.place record['created_at'] = tweet.created_at.to_s record['user_name'] = tweet.user.name record['user_screen_name'] = tweet.user.screen_name record['user_profile_image_url'] = tweet.user.profile_image_url.to_s record['user_time_zone'] = tweet.user.time_zone record['user_lang'] = tweet.user.lang end router.emit(@tag, Fluent::Engine.now, record) end |
#handle_object(object) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/fluent/plugin/in_twitter.rb', line 86 def handle_object(object) return unless @running if (object) (object) end end |
#hash_flatten(record, prefix = nil) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/fluent/plugin/in_twitter.rb', line 125 def hash_flatten(record, prefix = nil) record.inject({}) do |d, (k, v)| k = prefix.to_s + k.to_s if v.instance_of?(Hash) d.merge(hash_flatten(v, k + @flatten_separator)) elsif v.instance_of?(String) d.merge(k => v.scrub("")) else d.merge(k => v) end end end |
#hash_key_to_s(hash) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/fluent/plugin/in_twitter.rb', line 138 def hash_key_to_s(hash) newhash = {} hash.each do |k, v| if v.instance_of?(Hash) newhash[k.to_s] = hash_key_to_s(v) elsif v.instance_of?(Array) newhash[k.to_s] = array_key_to_s(v) elsif v.instance_of?(String) newhash[k.to_s] = v.scrub('') else newhash[k.to_s] = v end end newhash end |
#is_message?(tweet) ⇒ Boolean
93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/in_twitter.rb', line 93 def (tweet) return false if !tweet.is_a?(Twitter::Tweet) return false if (!@lang.nil? && @lang != '') && !@lang.include?(tweet.user.lang) if @timeline == :userstream && (!@keyword.nil? && @keyword != '') pattern = NKF::nkf('-WwZ1', @keyword).gsub(/,\s?/, '|') tweet = NKF::nkf('-WwZ1', tweet.text) return false if !Regexp.new(pattern, Regexp::IGNORECASE).match(tweet) end return true end |
#run ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/in_twitter.rb', line 67 def run notice = "twitter: starting Twitter Streaming API for #{@timeline}." notice << " tag:#{@tag}" notice << " lang:#{@lang}" unless @lang.nil? notice << " keyword:#{@keyword}" unless @keyword.nil? notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil? log.info notice if [:sampling, :tracking].include?(@timeline) && @keyword @client.filter(track: @keyword, &method(:handle_object)) elsif @timeline == :tracking && @follow_ids @client.filter(follow: @follow_ids, &method(:handle_object)) elsif @timeline == :sampling && @keyword.nil? && @follow_ids.nil? @client.sample(&method(:handle_object)) elsif @timeline == :userstream @client.user(&method(:handle_object)) end end |
#shutdown ⇒ Object
61 62 63 64 65 |
# File 'lib/fluent/plugin/in_twitter.rb', line 61 def shutdown @running = false @client.close if @client.respond_to?(:close) super end |
#start ⇒ Object
54 55 56 57 58 59 |
# File 'lib/fluent/plugin/in_twitter.rb', line 54 def start @running = true thread_create(:in_twitter) do run end end |