Module: Busdriver

Extended by:
Press
Defined in:
lib/busdriver.rb

Constant Summary collapse

TIMEOUT =
3
TIME_TO_LIVE =
30
TIME_TO_EXPIRE =
90

Class Method Summary collapse

Class Method Details

.connObject



63
64
65
# File 'lib/busdriver.rb', line 63

def self.conn
  @conn ||= connect(url)
end

.connect(url) ⇒ Object



55
56
57
# File 'lib/busdriver.rb', line 55

def self.connect(url)
  Redis.connect(url: url, timeout: timeout)
end

.connsObject



59
60
61
# File 'lib/busdriver.rb', line 59

def self.conns
  @conns ||= urls.map { |url| connect(url) }
end

.counts(pattern) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/busdriver.rb', line 114

def self.counts(pattern)
  llen, llens = 0, Hash.new(0)
  conns.shuffle.each do |conn|
    begin
      conn.keys(pattern).each do |key|
        len = conn.llen(key)
        llen += len
        llens[key] += len
      end
    rescue => e
      pdfme __FILE__, __method__, e, host: conn.client.host
    end
  end
  pdfm __FILE__, __method__, llens, length: llen
end

.drain(pattern) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/busdriver.rb', line 130

def self.drain(pattern)
  conns.shuffle.each do |conn|
    begin
      conn.keys(pattern).each do |key|
        conn.del(key)
        pdfm __FILE__, __method__, key: key
      end
    rescue => e
      pdfme __FILE__, __method__, e, host: conn.client.host
    end
  end
end

.header_formatObject



67
68
69
# File 'lib/busdriver.rb', line 67

def self.header_format
  { message_id: SecureRandom.uuid, published_on: Time.now.to_i, ttl: time_to_live }
end

.publish(key, data) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/busdriver.rb', line 71

def self.publish(key, data)
  header = header_format
  payload_json = JSON.dump(header: header, payload: data)
  pdfm __FILE__, __method__, header, key: key
  conns.shuffle.each do |conn|
    begin
      conn.rpush(key, payload_json)
      conn.expire(key, time_to_expire) rescue nil
      pdfm __FILE__, __method__, at: "published", key: key
    rescue => e
      pdfme __FILE__, __method__, e, host: conn.client.host
    end
  end
end

.subscribe(keys, &blk) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/busdriver.rb', line 86

def self.subscribe(keys, &blk)
  while true
    begin
      key, payload_json = conn.blpop(*keys, 1)
      if payload_json
        payload = JSON.parse(payload_json)
        header, data = payload.values_at("header", "payload")
        published_on, ttl = header.values_at("published_on", "ttl")
        pdfm __FILE__, __method__, header, key: key
        if Time.now.to_i - published_on.to_i > ttl
          pdfm __FILE__, __method__, header, at: "timeout", key: key
        else
          begin
            pdfm __FILE__, __method__, header, at: "received", key: key
            yield key, data
            pdfm __FILE__, __method__, header, at: "processed", key: key
          rescue => e
            pdfme __FILE__, __method__, e
          end
        end
      end
    rescue => e
      pdfme __FILE__, __method__, e, host: conn.client.host
      raise e
    end
  end
end

.time_to_expireObject



35
36
37
# File 'lib/busdriver.rb', line 35

def self.time_to_expire
  @time_to_expire ||= ENV['BUSDRIVER_TIME_TO_EXPIRE'] || TIME_TO_EXPIRE
end

.time_to_expire=(time_to_expire) ⇒ Object



31
32
33
# File 'lib/busdriver.rb', line 31

def self.time_to_expire=(time_to_expire)
  @time_to_expire = time_to_expire
end

.time_to_liveObject



25
26
27
# File 'lib/busdriver.rb', line 25

def self.time_to_live
  @time_to_live ||= ENV['BUSDRIVER_TIME_TO_LIVE'] || TIME_TO_LIVE
end

.time_to_live=(time_to_live) ⇒ Object



21
22
23
# File 'lib/busdriver.rb', line 21

def self.time_to_live=(time_to_live)
  @time_to_live = time_to_live
end

.timeoutObject



15
16
17
# File 'lib/busdriver.rb', line 15

def self.timeout
  @timeout ||= ENV['BUSDRIVER_TIMEOUT'] || TIMEOUT
end

.timeout=(timeout) ⇒ Object



11
12
13
# File 'lib/busdriver.rb', line 11

def self.timeout=(timeout)
  @timeout = timeout
end

.urlObject



51
52
53
# File 'lib/busdriver.rb', line 51

def self.url
  @url ||= urls[ENV['BUSDRIVER_ZONE'].ord % urls.size]
end

.url=(url) ⇒ Object



47
48
49
# File 'lib/busdriver.rb', line 47

def self.url=(url)
  @url = url
end

.urlsObject



43
44
45
# File 'lib/busdriver.rb', line 43

def self.urls
  @urls ||= ENV['BUSDRIVER_URLS'].split(",")
end

.urls=(urls) ⇒ Object



39
40
41
# File 'lib/busdriver.rb', line 39

def self.urls=(urls)
  @urls = urls
end