Class: MessageBus::Backends::Memory::Client
- Inherits:
-
Object
- Object
- MessageBus::Backends::Memory::Client
show all
- Defined in:
- lib/message_bus/backends/memory.rb
Defined Under Namespace
Classes: Channel, Listener
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(_config) ⇒ Client
Returns a new instance of Client.
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/message_bus/backends/memory.rb', line 55
def initialize(_config)
@mutex = Mutex.new
@listeners = []
@timer_thread = MessageBus::TimerThread.new
@timer_thread.on_error do |e|
logger.warn "Failed to process job: #{e} #{e.backtrace}"
end
@timer_thread.every(1) { expire }
reset!
end
|
Instance Attribute Details
#max_backlog_age ⇒ Object
Returns the value of attribute max_backlog_age.
16
17
18
|
# File 'lib/message_bus/backends/memory.rb', line 16
def max_backlog_age
@max_backlog_age
end
|
Instance Method Details
#add(channel, value, max_backlog_age:) ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/message_bus/backends/memory.rb', line 66
def add(channel, value, max_backlog_age:)
listeners = nil
id = nil
sync do
id = @global_id += 1
channel_object = chan(channel)
channel_object.backlog << [id, value, Time.now]
if max_backlog_age
channel_object.ttl = max_backlog_age
end
listeners = @listeners.dup
end
msg = MessageBus::Message.new id, id, channel, value
payload = msg.encode
listeners.each { |l| l.push(payload) }
id
end
|
#backlog(channel, backlog_id) ⇒ Object
108
109
110
|
# File 'lib/message_bus/backends/memory.rb', line 108
def backlog(channel, backlog_id)
sync { chan(channel).backlog.select { |id, _| id > backlog_id } }
end
|
#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object
102
103
104
105
106
|
# File 'lib/message_bus/backends/memory.rb', line 102
def clear_channel_backlog(channel, backlog_id, num_to_keep)
oldest = backlog_id - num_to_keep
sync { chan(channel).backlog.delete_if { |id, _| id <= oldest } }
nil
end
|
#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/message_bus/backends/memory.rb', line 90
def clear_global_backlog(backlog_id, num_to_keep)
if backlog_id > num_to_keep
oldest = backlog_id - num_to_keep
sync do
@channels.each_value do |channel|
channel.backlog.delete_if { |id, _| id <= oldest }
end
end
nil
end
end
|
#expire ⇒ Object
84
85
86
87
88
|
# File 'lib/message_bus/backends/memory.rb', line 84
def expire
sync do
@channels.delete_if { |_name, channel| channel.expired? }
end
end
|
#expire_all_backlogs! ⇒ Object
use with extreme care, will nuke all of the data
133
134
135
136
137
|
# File 'lib/message_bus/backends/memory.rb', line 133
def expire_all_backlogs!
sync do
@channels = {}
end
end
|
#get_value(channel, id) ⇒ Object
120
121
122
|
# File 'lib/message_bus/backends/memory.rb', line 120
def get_value(channel, id)
sync { chan(channel).backlog.find { |i, _| i == id }[1] }
end
|
#global_backlog(backlog_id) ⇒ Object
112
113
114
115
116
117
118
|
# File 'lib/message_bus/backends/memory.rb', line 112
def global_backlog(backlog_id)
sync do
@channels.dup.flat_map do |channel_name, channel|
channel.backlog.select { |id, _| id > backlog_id }.map { |id, value| [id, channel_name, value] }
end.sort
end
end
|
#max_id(channel = nil) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
|
# File 'lib/message_bus/backends/memory.rb', line 139
def max_id(channel = nil)
if channel
sync do
if entry = chan(channel).backlog.last
entry.first
end
end
else
sync { @global_id - 1 }
end || 0
end
|
#reset! ⇒ Object
Dangerous, drops the message_bus table containing the backlog if it exists.
125
126
127
128
129
130
|
# File 'lib/message_bus/backends/memory.rb', line 125
def reset!
sync do
@global_id = 0
@channels = {}
end
end
|
#subscribe {|listener| ... } ⇒ Object
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/message_bus/backends/memory.rb', line 151
def subscribe
listener = Listener.new
yield listener
q = Queue.new
sync do
@listeners << q
end
listener.do_sub.call
while msg = q.pop
listener.do_message.call(nil, msg)
end
listener.do_unsub.call
sync do
@listeners.delete(q)
end
nil
end
|
#unsubscribe ⇒ Object
172
173
174
|
# File 'lib/message_bus/backends/memory.rb', line 172
def unsubscribe
sync { @listeners.each { |l| l.push(nil) } }
end
|