Class: Akane::Recorder
- Inherits:
-
Object
show all
- Defined in:
- lib/akane/recorder.rb
Defined Under Namespace
Classes: RoundrobinFlags, Stop
Instance Method Summary
collapse
-
#dequeue(raise_errors = false) ⇒ Object
-
#initialize(storages, timeout: 20, logger: Logger.new(nil)) ⇒ Recorder
constructor
A new instance of Recorder.
-
#mark_as_deleted(account, user_id, tweet_id) ⇒ Object
-
#perform(action, account, *payload, raise_errors: false) ⇒ Object
-
#queue_length ⇒ Object
-
#record_event(account, event) ⇒ Object
-
#record_message(account, message) ⇒ Object
-
#record_tweet(account, tweet) ⇒ Object
-
#run(raise_errors = false) ⇒ Object
-
#stop! ⇒ Object
Constructor Details
#initialize(storages, timeout: 20, logger: Logger.new(nil)) ⇒ Recorder
Returns a new instance of Recorder.
8
9
10
11
12
13
14
15
|
# File 'lib/akane/recorder.rb', line 8
def initialize(storages, timeout: 20, logger: Logger.new(nil))
@storages = storages
@logger = logger
@queue = Queue.new
@recently_performed = RoundrobinFlags.new(1000)
@timeout = timeout
@stop = false
end
|
Instance Method Details
#dequeue(raise_errors = false) ⇒ Object
45
46
47
|
# File 'lib/akane/recorder.rb', line 45
def dequeue(raise_errors = false)
perform(*@queue.pop, raise_errors: raise_errors)
end
|
#mark_as_deleted(account, user_id, tweet_id) ⇒ Object
27
28
29
30
31
|
# File 'lib/akane/recorder.rb', line 27
def mark_as_deleted(account, user_id, )
return self if @stop
@queue << [:mark_as_deleted, account, user_id, ]
self
end
|
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/akane/recorder.rb', line 49
def perform(action, account, *payload, raise_errors: false)
if action == :record_tweet
= payload.last
return if @recently_performed[.id]
@recently_performed.flag!(.id)
if .
perform(:record_tweet, account, ., raise_errors: raise_errors)
end
end
@storages.each do |storage|
begin
Timeout.timeout(@timeout) do
storage.__send__(action, account, *payload)
end
rescue Timeout::Error => e
raise e if raise_errors
@logger.warn "#{storage.name} (#{action}) timed out"
rescue Interrupt, SignalException, SystemExit => e
raise e
rescue Exception => e
raise e if raise_errors
@logger.error "Error while recorder performing to #{storage.inspect}: #{e.inspect}"
@logger.error e.backtrace
end
end
end
|
#queue_length ⇒ Object
17
18
19
|
# File 'lib/akane/recorder.rb', line 17
def queue_length
@queue.size
end
|
#record_event(account, event) ⇒ Object
39
40
41
42
43
|
# File 'lib/akane/recorder.rb', line 39
def record_event(account, event)
return self if @stop
@queue << [:record_event, account, event]
self
end
|
#record_message(account, message) ⇒ Object
33
34
35
36
37
|
# File 'lib/akane/recorder.rb', line 33
def record_message(account, message)
return self if @stop
@queue << [:record_message, account, message]
self
end
|
21
22
23
24
25
|
# File 'lib/akane/recorder.rb', line 21
def (account, )
return self if @stop
@queue << [:record_tweet, account, ]
self
end
|
#run(raise_errors = false) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/akane/recorder.rb', line 81
def run(raise_errors = false)
@running_thread = Thread.new do
loop do
begin
begin
self.dequeue(raise_errors)
rescue Interrupt, SignalException, Stop
end
if @stop
break if self.queue_length.zero?
@logger.info "processing queue: #{self.queue_length} remaining."
end
rescue Exception => e
raise e if raise_errors
@logger.error "Error while recorder dequing: #{e.inspect}"
@logger.error e.backtrace
end
end
@logger.info "Recorder stopped."
@stop = false
end
@running_thread.join
nil
end
|
#stop! ⇒ Object
109
110
111
112
|
# File 'lib/akane/recorder.rb', line 109
def stop!
@stop = true
@running_thread.raise Stop
end
|