Class: Goat::StateSrvConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
EM::P::LineText2
Defined in:
lib/goat/state-srv.rb

Overview

this is an ugly mix of sync and async stuff

Constant Summary collapse

@@connection =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pusherObject

Returns the value of attribute pusher.



116
117
118
# File 'lib/goat/state-srv.rb', line 116

def pusher
  @pusher
end

Class Method Details

.connect(host, port, &dlg) ⇒ Object



72
73
74
75
76
# File 'lib/goat/state-srv.rb', line 72

def self.connect(host, port, &dlg)
  @host = host
  @port = port
  EM.connect(host, port, self)
end

.connected?Boolean

Returns:

  • (Boolean)


70
# File 'lib/goat/state-srv.rb', line 70

def self.connected?; @@connection != nil; end

.connectionObject



68
# File 'lib/goat/state-srv.rb', line 68

def self.connection; @@connection; end

.connection=(c) ⇒ Object



69
# File 'lib/goat/state-srv.rb', line 69

def self.connection=(c); @@connection = c; end

.reconnectObject



78
79
80
81
# File 'lib/goat/state-srv.rb', line 78

def self.reconnect
  raise 'Reconnect called before connection made' if @host.nil? || @port.nil?
  self.connect(@host, @port)
end

.reconnect_syncObject



83
84
85
# File 'lib/goat/state-srv.rb', line 83

def self.reconnect_sync
  @sock = TCPSocket.open(@host, @port)
end

.send_message(*args) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/goat/state-srv.rb', line 108

def self.send_message(*args)
  if self.connected?
    self.connection.send_message(*args)
  else
    raise NoStateSrvConnectionError
  end
end

.send_message_sync(msg, failed_last_time = false) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/goat/state-srv.rb', line 91

def self.send_message_sync(msg, failed_last_time=false)
  reconnect_sync unless sync_connection_active?
  @sock.write(msg.to_json + "\n")
  resp = @sock.readline
  Goat.logd("=> #{resp.inspect}") if $verbose
  resp
rescue Errno::ECONNRESET, EOFError => e
  # almost certainly connection was closed and we didn't notice
  if failed_last_time
    raise e
  else
    Goat.logw "Reinitializing sync connection to state-srv (#{e.inspect})"
    reconnect_sync
    send_message_sync(msg, true)
  end
end

.sync_connection_active?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/goat/state-srv.rb', line 87

def self.sync_connection_active?
  @sock && !@sock.closed?
end

Instance Method Details

#closeObject



158
159
160
# File 'lib/goat/state-srv.rb', line 158

def close
  close_connection
end

#connection_completedObject



118
119
120
121
122
# File 'lib/goat/state-srv.rb', line 118

def connection_completed
  @was_connected = true
  Goat.logw "Connected to StateSrv"
  StateSrvConnection.connection = self
end

#message_received(msg) ⇒ Object



136
137
138
139
140
141
142
143
144
# File 'lib/goat/state-srv.rb', line 136

def message_received(msg)
  msg = msg['response']

  if msg['type'] = 'update_ack'
    StateSrvClient.components_update_completed(msg['components'])
  else
    raise "Unknown message type: #{msg['type']}"
  end
end

#receive_line(line) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
# File 'lib/goat/state-srv.rb', line 124

def receive_line(line)
  msg = JSON.load(line)

  Goat.logd("=> #{msg.inspect}") if $verbose

  if msg.is_a?(Array) 
    msg.each{|m| message_received(m)}
  else
    message_received(msg)
  end
end

#send_message(t, msg, sync = false) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/goat/state-srv.rb', line 146

def send_message(t, msg, sync=false)
  msg = msg.merge('type' => t)

  Goat.logd(">> #{msg.inspect}") if $verbose

  if sync
    self.class.send_message_sync(msg) # TODO better way to do this?
  else
    send_data(msg.to_json + "\n")
  end
end

#unbindObject



162
163
164
165
166
167
168
169
170
171
# File 'lib/goat/state-srv.rb', line 162

def unbind
  if @was_connected
    Goat.logw "Lost StateSrv connection"
  else
    Goat.logw "Couldn't open StateSrv connection"
  end

  StateSrvConnection.connection = nil
  EM.add_timer(5) { self.class.reconnect }
end