Class: SeapigServer

Inherits:
Object
  • Object
show all
Defined in:
lib/seapig/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, options = {}) ⇒ SeapigServer

Returns a new instance of SeapigServer.



30
31
32
33
34
35
36
37
38
39
# File 'lib/seapig/client.rb', line 30

def initialize(uri, options={})
	@connected = false
	@uri = uri
	@options = options
	@slave_objects = {}
	@master_objects = {}
	@notifier_objects = {}

	connect
end

Instance Attribute Details

#connectedObject (readonly)

Returns the value of attribute connected.



28
29
30
# File 'lib/seapig/client.rb', line 28

def connected
  @connected
end

#socketObject (readonly)

Returns the value of attribute socket.



28
29
30
# File 'lib/seapig/client.rb', line 28

def socket
  @socket
end

Instance Method Details

#connectObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/seapig/client.rb', line 42

def connect

	if @socket
		@socket.onclose {}
		@socket.close
	end

	@timeout_timer ||= EM.add_periodic_timer(10) {
		next if not @socket
		next if Time.new.to_f - @last_communication_at < 20
		puts "Seapig ping timeout, reconnecting"
		connect
	}

	@connected = false

	@last_communication_at = Time.new.to_f
	@socket = WebSocket::EventMachine::Client.connect(uri: @uri)

	@socket.onopen {
		puts 'Connected to seapig server'
		@connected = true
		@socket.send JSON.dump(action: 'client-options-set', options: @options)
		@slave_objects.each_pair { |object_id, object|
			@socket.send JSON.dump(action: 'object-consumer-register', id: object_id, "known-version" => object.version)
		}
		@master_objects.each_pair { |object_id, object|
			@socket.send JSON.dump(action: 'object-producer-register', pattern: object_id, "known-version" => object.version)
		}
		@last_communication_at = Time.new.to_f
	}

	@socket.onmessage { |message|
		message = JSON.load message
		#p message['action'], message['id'], message['patch']
		case message['action']
		when 'object-update'
			@slave_objects.values.each { |object|
				object.patch(message) if object.matches?(message['id'])
			}
		when 'object-destroy'
			@slave_objects.values.each { |object|
				object.destroy(message) if object.matches?(message['id'])
			}
		when 'object-produce'
			handler = @master_objects.keys.find { |key| key.include?('*') and (message['id'] =~ Regexp.new(Regexp.escape(key).gsub('\*','.*?'))) or (message['id'] == key) }
			@master_objects[handler].onproduce_proc.call(message['id']) if @master_objects[handler].onproduce_proc
			@master_objects[handler].upload(0,{},message['id']) if @master_objects[handler]
		else
			p :wtf, message
		end
		@last_communication_at = Time.new.to_f
	}

	@socket.onclose { |code, reason|
		puts 'Seapig connection died unexpectedly (code:'+code.inspect+', reason:'+reason.inspect+'), reconnecting in 1s'
		EM.add_timer(1) { connect }
	}

	@socket.onerror { |error|
		puts 'Seapig error: '+error.inspect
		@socket.close
		EM.add_timer(1) { connect }
	}

	@socket.onping {
		@last_communication_at = Time.new.to_f
	}

end

#detach_fdObject



132
133
134
# File 'lib/seapig/client.rb', line 132

def detach_fd
	disconnect(true)
end

#disconnect(detach_fd = false) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/seapig/client.rb', line 114

def disconnect(detach_fd = false)
	@connected = false
	if @timeout_timer
		@timeout_timer.cancel
		@timeout_timer = nil
	end
	if @socket
		@socket.onclose {}
		if detach_fd
			IO.new(@socket.detach).close
		else
			@socket.close
		end
		@socket = nil
	end
end

#master(object_id) ⇒ Object



144
145
146
147
148
149
# File 'lib/seapig/client.rb', line 144

def master(object_id)
	object = SeapigObject.new(self, object_id)
	object.version = (Time.new.to_f*1000000).to_i
	@socket.send JSON.dump(action: 'object-producer-register', pattern: object_id) if @connected
	@master_objects[object_id] = object
end

#notifier(object_id) ⇒ Object



152
153
154
155
156
# File 'lib/seapig/client.rb', line 152

def notifier(object_id)
	object = SeapigObject.new(self, object_id)
	object.version = 0
	@notifier_objects[object_id] = object
end

#slave(object_id) ⇒ Object



137
138
139
140
141
# File 'lib/seapig/client.rb', line 137

def slave(object_id)
	object = if object_id.include?('*') then SeapigWildcardObject.new(self, object_id) else SeapigObject.new(self, object_id) end
	@socket.send JSON.dump(action: 'object-consumer-register', id: object_id, latest_known_version: object.version) if @connected
	@slave_objects[object_id] = object
end