Module: Busdriver
- Extended by:
- Press
- Defined in:
- lib/busdriver.rb
Constant Summary collapse
- TIMEOUT =
3
- TIME_TO_LIVE =
30
- TIME_TO_EXPIRE =
90
Class Method Summary collapse
- .conn ⇒ Object
- .connect(url) ⇒ Object
- .conns ⇒ Object
- .counts(pattern) ⇒ Object
- .drain(pattern) ⇒ Object
- .header_format ⇒ Object
- .publish(key, data) ⇒ Object
- .subscribe(keys, &blk) ⇒ Object
- .time_to_expire ⇒ Object
- .time_to_expire=(time_to_expire) ⇒ Object
- .time_to_live ⇒ Object
- .time_to_live=(time_to_live) ⇒ Object
- .timeout ⇒ Object
- .timeout=(timeout) ⇒ Object
- .url ⇒ Object
- .url=(url) ⇒ Object
- .urls ⇒ Object
- .urls=(urls) ⇒ Object
Class Method Details
.conn ⇒ Object
63 64 65 |
# File 'lib/busdriver.rb', line 63 def self.conn @conn ||= connect(url) end |
.connect(url) ⇒ Object
55 56 57 |
# File 'lib/busdriver.rb', line 55 def self.connect(url) Redis.connect(url: url, timeout: timeout) end |
.conns ⇒ Object
59 60 61 |
# File 'lib/busdriver.rb', line 59 def self.conns @conns ||= urls.map { |url| connect(url) } end |
.counts(pattern) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/busdriver.rb', line 114 def self.counts(pattern) llen, llens = 0, Hash.new(0) conns.shuffle.each do |conn| begin conn.keys(pattern).each do |key| len = conn.llen(key) llen += len llens[key] += len end rescue => e pdfme __FILE__, __method__, e, host: conn.client.host end end pdfm __FILE__, __method__, llens, length: llen end |
.drain(pattern) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/busdriver.rb', line 130 def self.drain(pattern) conns.shuffle.each do |conn| begin conn.keys(pattern).each do |key| conn.del(key) pdfm __FILE__, __method__, key: key end rescue => e pdfme __FILE__, __method__, e, host: conn.client.host end end end |
.header_format ⇒ Object
67 68 69 |
# File 'lib/busdriver.rb', line 67 def self.header_format { message_id: SecureRandom.uuid, published_on: Time.now.to_i, ttl: time_to_live } end |
.publish(key, data) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/busdriver.rb', line 71 def self.publish(key, data) header = header_format payload_json = JSON.dump(header: header, payload: data) pdfm __FILE__, __method__, header, key: key conns.shuffle.each do |conn| begin conn.rpush(key, payload_json) conn.expire(key, time_to_expire) rescue nil pdfm __FILE__, __method__, at: "published", key: key rescue => e pdfme __FILE__, __method__, e, host: conn.client.host end end end |
.subscribe(keys, &blk) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/busdriver.rb', line 86 def self.subscribe(keys, &blk) while true begin key, payload_json = conn.blpop(*keys, 1) if payload_json payload = JSON.parse(payload_json) header, data = payload.values_at("header", "payload") published_on, ttl = header.values_at("published_on", "ttl") pdfm __FILE__, __method__, header, key: key if Time.now.to_i - published_on.to_i > ttl pdfm __FILE__, __method__, header, at: "timeout", key: key else begin pdfm __FILE__, __method__, header, at: "received", key: key yield key, data pdfm __FILE__, __method__, header, at: "processed", key: key rescue => e pdfme __FILE__, __method__, e end end end rescue => e pdfme __FILE__, __method__, e, host: conn.client.host raise e end end end |
.time_to_expire ⇒ Object
35 36 37 |
# File 'lib/busdriver.rb', line 35 def self.time_to_expire @time_to_expire ||= ENV['BUSDRIVER_TIME_TO_EXPIRE'] || TIME_TO_EXPIRE end |
.time_to_expire=(time_to_expire) ⇒ Object
31 32 33 |
# File 'lib/busdriver.rb', line 31 def self.time_to_expire=(time_to_expire) @time_to_expire = time_to_expire end |
.time_to_live ⇒ Object
25 26 27 |
# File 'lib/busdriver.rb', line 25 def self.time_to_live @time_to_live ||= ENV['BUSDRIVER_TIME_TO_LIVE'] || TIME_TO_LIVE end |
.time_to_live=(time_to_live) ⇒ Object
21 22 23 |
# File 'lib/busdriver.rb', line 21 def self.time_to_live=(time_to_live) @time_to_live = time_to_live end |
.timeout ⇒ Object
15 16 17 |
# File 'lib/busdriver.rb', line 15 def self.timeout @timeout ||= ENV['BUSDRIVER_TIMEOUT'] || TIMEOUT end |
.timeout=(timeout) ⇒ Object
11 12 13 |
# File 'lib/busdriver.rb', line 11 def self.timeout=(timeout) @timeout = timeout end |
.url ⇒ Object
51 52 53 |
# File 'lib/busdriver.rb', line 51 def self.url @url ||= urls[ENV['BUSDRIVER_ZONE'].ord % urls.size] end |
.url=(url) ⇒ Object
47 48 49 |
# File 'lib/busdriver.rb', line 47 def self.url=(url) @url = url end |
.urls ⇒ Object
43 44 45 |
# File 'lib/busdriver.rb', line 43 def self.urls @urls ||= ENV['BUSDRIVER_URLS'].split(",") end |
.urls=(urls) ⇒ Object
39 40 41 |
# File 'lib/busdriver.rb', line 39 def self.urls=(urls) @urls = urls end |