46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/qswarm/connections/twitter.rb', line 46
def run
begin
if @track
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Tracking keywords: " + @track.to_s
TweetStream::Client.new.track( @track.values.flatten.reject { |k| /^-/.match(k) } ) do |status|
@track.each do |group, list|
matches = []
list.each do |keyword|
if keyword.split(' ').select { |k| /^-/.match(k) }.none? { |word| status.text.downcase.include? word[1..-1].downcase }
if keyword.split(' ').reject { |k| /^-/.match(k) }.all? { |word| status.text.downcase.include? word.downcase }
matches << keyword
end
end
end
if !matches.empty?
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Sending :track/#{group.inspect} #{status.user.screen_name} :: #{status.text} :: #{matches.to_s}"
emit(:raw => status.attrs, :headers => { :type => :track, :group => group, :matches => matches })
end
end
end.on_error do |message|
Qswarm.logger.error "[#{@agent.name.inspect} #{@name.inspect}] #{message}"
end
end
if @follow
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Tracking Users: " + @follow.to_s
TweetStream::Client.new.follow( *@follow.values.flatten ) do |status|
@follow.each do |group, users|
if users.include?(status.user.id)
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Sending :follow/#{group.inspect} #{status.user.screen_name} :: #{status.text}"
emit(:raw => status.attrs, :headers => { :type => :follow, :group => group, :user_id => status.user.id })
end
end
end.on_error do |message|
Qswarm.logger.error "[#{@agent.name.inspect} #{@name.inspect}] #{message}"
end
end
rescue TweetStream::ReconnectError
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Hit max reconnects, restarting tweetstream in 60 seconds ..."
EM.timer(60, run)
end
if @list
timer = 30
since_id = {}
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Tracking List: " + @list.to_s + " every #{timer} seconds"
@list.each do |group, lists|
lists.each do |user, slug|
@rest_client.list_timeline(user, slug).each do |status|
since_id["#{user}/#{slug}"] = status.attrs[:id] and break
end
end
EventMachine::PeriodicTimer.new(timer) do
lists.each do |user, slug|
begin
@rest_client.list_timeline(user, slug, { :since_id => since_id["#{user}/#{slug}"] }).each do |status|
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Sending :list/#{slug.inspect} #{status.attrs[:user][:screen_name]} :: #{status.text}"
emit(:raw => status.attrs, :headers => { :type => :list, :group => group, :user_id => user, :slug => slug })
since_id["#{user}/#{slug}"] = status.attrs[:id]
end
rescue ::Twitter::Error::ClientError
Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Twitter REST API client error"
end
end
end
end
end
dsl_call(&@on_connect) if @on_connect
end
|