Class: Fluent::Plugin::TwitterInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_twitter.rb

Constant Summary collapse

TIMELINE_TYPE =
%i(userstream sampling location tracking)
OUTPUT_FORMAT_TYPE =
%i(nest flat simple)

Instance Method Summary collapse

Constructor Details

#initializeTwitterInput

Returns a new instance of TwitterInput.



35
36
37
38
# File 'lib/fluent/plugin/in_twitter.rb', line 35

def initialize
  super
  @running = false
end

Instance Method Details

#array_key_to_s(array) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/fluent/plugin/in_twitter.rb', line 154

def array_key_to_s(array)
  array.map do |v|
    if v.instance_of?(Hash)
      hash_key_to_s(v)
    elsif v.instance_of?(Array)
      array_key_to_s(v)
    elsif v.instance_of?(String)
      v.scrub('')
    else
      v
    end
  end
end

#configure(conf) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/in_twitter.rb', line 40

def configure(conf)
  super

  @keyword = @keyword.gsub('${hashtag}', '#') unless @keyword.nil?

  @client = Twitter::Streaming::Client.new do |config|
    config.consumer_key = @consumer_key
    config.consumer_secret = @consumer_secret
    config.access_token = @access_token
    config.access_token_secret = @access_token_secret
    config.proxy = @proxy.to_h if @proxy
  end
end

#get_message(tweet) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/plugin/in_twitter.rb', line 104

def get_message(tweet)
  case @output_format
  when :nest
    record = hash_key_to_s(tweet.to_h)
  when :flat
    record = hash_flatten(tweet.to_h)
  when :simple
    record = {}
    record['message'] = tweet.text.scrub('')
    record['geo'] = tweet.geo
    record['place'] = tweet.place
    record['created_at'] = tweet.created_at.to_s
    record['user_name'] = tweet.user.name
    record['user_screen_name'] = tweet.user.screen_name
    record['user_profile_image_url'] = tweet.user.profile_image_url.to_s
    record['user_time_zone'] = tweet.user.time_zone
    record['user_lang'] = tweet.user.lang
  end
  router.emit(@tag, Fluent::Engine.now, record)
end

#handle_object(object) ⇒ Object



86
87
88
89
90
91
# File 'lib/fluent/plugin/in_twitter.rb', line 86

def handle_object(object)
  return unless @running
  if is_message?(object)
    get_message(object)
  end
end

#hash_flatten(record, prefix = nil) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/in_twitter.rb', line 125

def hash_flatten(record, prefix = nil)
  record.inject({}) do |d, (k, v)|
    k = prefix.to_s + k.to_s
    if v.instance_of?(Hash)
      d.merge(hash_flatten(v, k + @flatten_separator))
    elsif v.instance_of?(String)
      d.merge(k => v.scrub(""))
    else
      d.merge(k => v)
    end
  end
end

#hash_key_to_s(hash) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/plugin/in_twitter.rb', line 138

def hash_key_to_s(hash)
  newhash = {}
  hash.each do |k, v|
    if v.instance_of?(Hash)
      newhash[k.to_s] = hash_key_to_s(v)
    elsif v.instance_of?(Array)
      newhash[k.to_s] = array_key_to_s(v)
    elsif v.instance_of?(String)
      newhash[k.to_s] = v.scrub('')
    else
      newhash[k.to_s] = v
    end
  end
  newhash
end

#is_message?(tweet) ⇒ Boolean

Returns:

  • (Boolean)


93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_twitter.rb', line 93

def is_message?(tweet)
  return false if !tweet.is_a?(Twitter::Tweet)
  return false if (!@lang.nil? && @lang != '') && !@lang.include?(tweet.user.lang)
  if @timeline == :userstream && (!@keyword.nil? && @keyword != '')
    pattern = NKF::nkf('-WwZ1', @keyword).gsub(/,\s?/, '|')
    tweet = NKF::nkf('-WwZ1', tweet.text)
    return false if !Regexp.new(pattern, Regexp::IGNORECASE).match(tweet)
  end
  return true
end

#runObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/in_twitter.rb', line 67

def run
  notice = "twitter: starting Twitter Streaming API for #{@timeline}."
  notice << " tag:#{@tag}"
  notice << " lang:#{@lang}" unless @lang.nil?
  notice << " keyword:#{@keyword}" unless @keyword.nil?
  notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil?
  log.info notice

  if [:sampling, :tracking].include?(@timeline) && @keyword
    @client.filter(track: @keyword, &method(:handle_object))
  elsif @timeline == :tracking && @follow_ids
    @client.filter(follow: @follow_ids, &method(:handle_object))
  elsif @timeline == :sampling && @keyword.nil? && @follow_ids.nil?
    @client.sample(&method(:handle_object))
  elsif @timeline == :userstream
    @client.user(&method(:handle_object))
  end
end

#shutdownObject



61
62
63
64
65
# File 'lib/fluent/plugin/in_twitter.rb', line 61

def shutdown
  @running = false
  @client.close if @client.respond_to?(:close)
  super
end

#startObject



54
55
56
57
58
59
# File 'lib/fluent/plugin/in_twitter.rb', line 54

def start
  @running = true
  thread_create(:in_twitter) do
    run
  end
end