Class: Meerkat::Backend::PG

Inherits:
Object
  • Object
show all
Defined in:
lib/meerkat/backend/pg.rb

Defined Under Namespace

Modules: SubscribeClient

Constant Summary collapse

TABLENAME =
'meerkat_pubsub'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(pg_uri = nil) ⇒ PG

Returns a new instance of PG.



8
9
10
11
12
13
14
15
# File 'lib/meerkat/backend/pg.rb', line 8

def initialize(pg_uri = nil)
  @pg_uri = pg_uri
  @subs = {}
  @pg = PGconn.connect pg_uri
  @last_check = @pg.exec('SELECT now() as now').first['now']
  create_table
  start_listener
end

Instance Method Details

#publish(route, json) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/meerkat/backend/pg.rb', line 17

def publish(route, json)
  @pg.transaction do |conn|
    conn.exec "INSERT INTO #{TABLENAME} (topic, json) VALUES ($1, $2)", [route, json]
    conn.exec "NOTIFY #{TABLENAME}"
  end
  @pg.async_exec "DELETE FROM #{TABLENAME} WHERE timestamp < now() - interval '5 seconds'"
end

#subscribe(route, &callback) ⇒ Object



25
26
27
28
29
30
31
32
# File 'lib/meerkat/backend/pg.rb', line 25

def subscribe(route, &callback)
  if @subs[route]
    @subs[route] << callback
  else
    @subs[route] = [callback]
  end
  [route, callback]
end

#unsubscribe(sid) ⇒ Object



34
35
36
# File 'lib/meerkat/backend/pg.rb', line 34

def unsubscribe(sid)
  @subs[sid[0]].delete sid[1]
end