Class: RelpServer
- Inherits:
-
Relp
show all
- Defined in:
- lib/logstash/util/relp.rb
Constant Summary
Constants inherited
from Relp
Relp::RelpSoftware, Relp::RelpVersion
Instance Method Summary
collapse
Methods inherited from Relp
#frame_read, #frame_write, #server?, #valid_command?
Constructor Details
#initialize(host, port, required_commands = [], ssl_context = nil) ⇒ RelpServer
Returns a new instance of RelpServer.
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/logstash/util/relp.rb', line 103
def initialize(host,port,required_commands=[],ssl_context=nil)
@logger = Cabin::Channel.get(LogStash)
@server=true
@basic_relp_commands = ['close']
@required_relp_commands = required_commands
begin
@server = TCPServer.new(host, port)
rescue Errno::EADDRINUSE
@logger.error("Could not start RELP server: Address in use",
:host => host, :port => port)
raise
end
if ssl_context
@server = OpenSSL::SSL::SSLServer.new(@server, ssl_context)
end
@logger.info("Started #{ssl_context ? 'SSL-enabled ' : ''}RELP Server", :host => host, :port => port)
end
|
Instance Method Details
#accept ⇒ Object
127
128
129
130
131
|
# File 'lib/logstash/util/relp.rb', line 127
def accept
socket = @server.accept
@logger.debug("New socket created")
return self, socket
end
|
#ack(socket, txnr) ⇒ Object
209
210
211
212
213
214
215
|
# File 'lib/logstash/util/relp.rb', line 209
def ack(socket, txnr)
frame = Hash.new
frame['txnr'] = txnr
frame['command'] = 'rsp'
frame['message'] = '200 OK'
self.frame_write(socket, frame)
end
|
#relp_setup_connection(socket) ⇒ Object
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
# File 'lib/logstash/util/relp.rb', line 133
def relp_setup_connection(socket)
frame=self.frame_read(socket)
if frame['command'] == 'open'
offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten]
if offer['relp_version'].nil?
@logger.warn("No relp version specified")
self.serverclose(socket)
raise RelpError, 'No relp_version specified'
elsif ! (@required_relp_commands - offer['commands'].split(',')).empty?
@logger.warn("Not all required commands are available", :required => @required_relp_commands, :offer => offer['commands'])
response_frame = Hash.new
response_frame['txnr'] = frame['txnr']
response_frame['command'] = 'rsp'
response_frame['message'] = '500 Required command(s) '
+ (@required_relp_commands - offer['commands'].split(',')).join(',')
+ ' not offered'
self.frame_write(socket,response_frame)
self.serverclose(socket)
raise InsufficientCommands, offer['commands']
+ ' offered, require ' + @required_relp_commands.join(',')
else
response_frame = Hash.new
response_frame['txnr'] = frame['txnr']
response_frame['command'] = 'rsp'
response_frame['message'] = '200 OK '
response_frame['message'] += 'relp_version=' + RelpVersion + "\n"
response_frame['message'] += 'relp_software=' + RelpSoftware + "\n"
response_frame['message'] += 'commands=' + @required_relp_commands.join(',') self.frame_write(socket, response_frame)
end
else
self.serverclose(socket)
raise InappropriateCommand, frame['command'] + ' expecting open'
end
end
|
#serverclose(socket) ⇒ Object
194
195
196
197
198
199
200
201
202
203
|
# File 'lib/logstash/util/relp.rb', line 194
def serverclose(socket)
frame = Hash.new
frame['txnr'] = 0
frame['command'] = 'serverclose'
begin
self.frame_write(socket,frame)
socket.close rescue nil
rescue ConnectionClosed
end
end
|
#shutdown ⇒ Object
205
206
207
|
# File 'lib/logstash/util/relp.rb', line 205
def shutdown
@server.close rescue nil
end
|
#syslog_read(socket) ⇒ Object
This does not ack the frame, just reads it
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
# File 'lib/logstash/util/relp.rb', line 175
def syslog_read(socket)
frame = self.frame_read(socket)
if frame['command'] == 'syslog'
return frame
elsif frame['command'] == 'close'
response_frame = Hash.new
response_frame['txnr'] = frame['txnr']
response_frame['command'] = 'rsp'
self.frame_write(socket,response_frame)
self.serverclose(socket)
raise ConnectionClosed
else
self.serverclose(socket)
raise InappropriateCommand, frame['command'] + ' expecting syslog'
end
end
|