Class: Smartware::PubSubServer

Inherits:
Object
  • Object
show all
Defined in:
lib/smartware/pub_sub_server.rb

Defined Under Namespace

Classes: PubSubConnection

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ PubSubServer

Returns a new instance of PubSubServer.



48
49
50
51
52
53
54
# File 'lib/smartware/pub_sub_server.rb', line 48

def initialize(host, port)
  @repush = nil
  @connections = Set.new
  @redis = Redis.new

  EventMachine.start_server host, port, PubSubConnection, self
end

Instance Attribute Details

#repushObject

Returns the value of attribute repush.



46
47
48
# File 'lib/smartware/pub_sub_server.rb', line 46

def repush
  @repush
end

Instance Method Details

#acknowlege_reliable(id) ⇒ Object



87
88
89
# File 'lib/smartware/pub_sub_server.rb', line 87

def acknowlege_reliable(id)
  @redis.hdel "smartware:reliable_events", id
end

#add_connection(connection) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/smartware/pub_sub_server.rb', line 56

def add_connection(connection)
  @connections.add connection
  @repush.call connection

  @redis.hgetall("smartware:reliable_events").each do |key, data|
    data = JSON.load(data)

    connection.publish_reliable_event key, data["key"], *data["args"]
  end
end

#publish_event(key, *args) ⇒ Object



71
72
73
74
75
# File 'lib/smartware/pub_sub_server.rb', line 71

def publish_event(key, *args)
  @connections.each do |connection|
    connection.publish_event key, *args
  end
end

#publish_reliable_event(key, *args) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/smartware/pub_sub_server.rb', line 77

def publish_reliable_event(key, *args)
  id = (Time.now.to_f * 1000000).round.to_s

  @redis.hset "smartware:reliable_events", id, JSON.dump({ key: key, args: args })

  @connections.each do |connection|
    connection.publish_reliable_event id, key, *args
  end
end

#remove_connection(connection) ⇒ Object



67
68
69
# File 'lib/smartware/pub_sub_server.rb', line 67

def remove_connection(connection)
  @connections.delete connection
end