Class: Rserve::Connection
- Includes:
- Protocol
- Defined in:
- lib/rserve/connection.rb
Overview
class providing TCP/IP connection to an Rserve
Defined Under Namespace
Classes: EvalError
Constant Summary collapse
- RserveNotStartedError =
:section: Errors
Class.new(StandardError)
- ServerNotAvailableError =
Class.new(StandardError)
- IncorrectServerError =
Class.new(StandardError)
- IncorrectServerVersionError =
Class.new(StandardError)
- IncorrectProtocolError =
Class.new(StandardError)
- NotConnectedError =
Class.new(StandardError)
- AT_plain =
authorization type: plain text
0
- AT_crypt =
authorization type: unix crypt
1
- @@connected_object =
nil
Constants included from Protocol
Protocol::CMD_RESP, Protocol::CMD_SPECIAL_MASK, Protocol::CMD_assignSEXP, Protocol::CMD_attachSession, Protocol::CMD_closeFile, Protocol::CMD_createFile, Protocol::CMD_ctrl, Protocol::CMD_ctrlEval, Protocol::CMD_ctrlShutdown, Protocol::CMD_ctrlSource, Protocol::CMD_detachSession, Protocol::CMD_detachedVoidEval, Protocol::CMD_eval, Protocol::CMD_login, Protocol::CMD_openFile, Protocol::CMD_readFile, Protocol::CMD_removeFile, Protocol::CMD_serAssign, Protocol::CMD_serEEval, Protocol::CMD_serEval, Protocol::CMD_setBufferSize, Protocol::CMD_setEncoding, Protocol::CMD_setSEXP, Protocol::CMD_shutdown, Protocol::CMD_voidEval, Protocol::CMD_writeFile, Protocol::DT_ARRAY, Protocol::DT_BYTESTREAM, Protocol::DT_CHAR, Protocol::DT_DOUBLE, Protocol::DT_INT, Protocol::DT_LARGE, Protocol::DT_SEXP, Protocol::DT_STRING, Protocol::ERROR_DESCRIPTIONS, Protocol::ERR_IOerror, Protocol::ERR_Rerror, Protocol::ERR_accessDenied, Protocol::ERR_auth_failed, Protocol::ERR_conn_broken, Protocol::ERR_ctrl_closed, Protocol::ERR_data_overflow, Protocol::ERR_detach_failed, Protocol::ERR_inv_cmd, Protocol::ERR_inv_par, Protocol::ERR_notOpen, Protocol::ERR_object_too_big, Protocol::ERR_out_of_mem, Protocol::ERR_session_busy, Protocol::ERR_unknownCmd, Protocol::ERR_unsupportedCmd, Protocol::MAX_LONG_SIGNED, Protocol::MAX_LONG_UNSIGNED, Protocol::RESP_ERR, Protocol::RESP_OK
Instance Attribute Summary collapse
-
#auth_req ⇒ Object
readonly
Returns the value of attribute auth_req.
-
#auth_type ⇒ Object
readonly
Returns the value of attribute auth_type.
-
#connected ⇒ Object
readonly
Returns the value of attribute connected.
-
#hostname ⇒ Object
readonly
Returns the value of attribute hostname.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#last_error ⇒ Object
readonly
Returns the value of attribute last_error.
-
#persistent ⇒ Object
writeonly
Sets the attribute persistent.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#port_number ⇒ Object
readonly
Returns the value of attribute port_number.
-
#protocol ⇒ Object
readonly
Returns the value of attribute protocol.
-
#rsrv_version ⇒ Object
readonly
Returns the value of attribute rsrv_version.
-
#rt ⇒ Object
readonly
Returns the value of attribute rt.
-
#s ⇒ Object
readonly
Returns the value of attribute s.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
-
#transfer_charset ⇒ Object
writeonly
Sets the attribute transfer_charset.
Instance Method Summary collapse
-
#assign(sym, ct) ⇒ Object
assign a string value to a symbol in R.
- #assign_rexp(sym, rexp) ⇒ Object
- #assign_string(sym, ct) ⇒ Object
-
#close ⇒ Object
Closes current connection.
- #connect ⇒ Object
-
#connected? ⇒ Boolean
Check connection state.
-
#detach ⇒ Object
Detaches the session and closes the connection (requires Rserve 0.4+).
-
#eval(cmd) ⇒ Object
evaluates the given command and retrieves the result * @param cmd command/expression string * @return R-xpression or
null
if an error occured */. -
#get_server_version ⇒ Object
Get server version as reported during the handshake.
-
#initialize(opts = Hash.new) ⇒ Connection
constructor
Make a new local connection You could provide a hash with options.
-
#parse_eval_response(rp) ⇒ Object
NOT TESTED.
-
#shutdown ⇒ Object
Shutdown remote Rserve.
-
#void_eval(cmd) ⇒ Object
evaluates the given command, but does not fetch the result (useful for assignment operations) * @param cmd command/expression string */.
-
#void_eval_detach(cmd) ⇒ Object
-
@param cmd command/expression string.
-
Methods included from Protocol
#doubleToRawLongBits, #get_int, #get_int_original, #get_len, #get_long, #get_long_original, #longBitsToDouble, #longBitsToDouble_old, #new_hdr, #set_hdr, #set_int, #set_long
Methods inherited from Engine
#create_reference, #evaluate, #finalize_reference, #get, #get_parent_enviroment, #lock, #new_enviroment, #parse, #parse_and_eval, #resolve_reference, #supports_REPL?, #supports_enviroments?, #supports_references?, #suuports_locking?, #try_lock, #unlock
Constructor Details
#initialize(opts = Hash.new) ⇒ Connection
Make a new local connection You could provide a hash with options. Options are analog to java client:
:auth_req
-
If authentification is required (false by default)
:transfer_charset
-
Transfer charset (“UTF-8” by default)
:auth_type
-
Type of authentification (AT_plain by default)
:hostname
-
Hostname of Rserve (“127.0.0.1” by default)
:port_number
-
Port Number of Rserve (6311 by default)
:max_tries
-
Maximum number of tries before give up (5 by default)
:cmd_init
-
Command to init Rserve if not initialized (“R CMD Rserve” by default)
:proc_rserve_ok
-
Proc testing if Rserve works (uses system by default)
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/rserve/connection.rb', line 50 def initialize(opts=Hash.new) @auth_req = opts.delete(:auth_req) || false @transfer_charset = opts.delete(:transfer_charset) || "UTF-8" @auth_type = opts.delete(:auth_type) || AT_plain @hostname = opts.delete(:hostname) || "127.0.0.1" @port_number = opts.delete(:port_number) || 6311 @max_tries = opts.delete(:max_tries) || 5 @cmd_init = opts.delete(:cmd_init) || "R CMD Rserve" @proc_rserve_ok = opts.delete(:proc_rserve_ok) || lambda { system "killall -s 0 Rserve" } @session = opts.delete(:session) || nil @tries = 0 @connected=false if (!@session.nil?) @hostname=@session.host @port_number=@session.port end begin #puts "Tryin to connect..." connect rescue Errno::ECONNREFUSED if @tries<@max_tries @tries+=1 # Rserve is available? if @proc_rserve_ok.call # Rserve is available. Incorrect host and/or portname raise ServerNotAvailableError, "Rserve started, but not available on #{hostname}:#{port_number}" # Rserve not available. We should instanciate it first else if run_server # Wait a moment, please sleep(0.25) retry else raise RserveNotStartedError, "Can't start Rserve" end end #puts "Init RServe" else raise end end end |
Instance Attribute Details
#auth_req ⇒ Object (readonly)
Returns the value of attribute auth_req.
26 27 28 |
# File 'lib/rserve/connection.rb', line 26 def auth_req @auth_req end |
#auth_type ⇒ Object (readonly)
Returns the value of attribute auth_type.
27 28 29 |
# File 'lib/rserve/connection.rb', line 27 def auth_type @auth_type end |
#connected ⇒ Object (readonly)
Returns the value of attribute connected.
25 26 27 |
# File 'lib/rserve/connection.rb', line 25 def connected @connected end |
#hostname ⇒ Object (readonly)
Returns the value of attribute hostname.
21 22 23 |
# File 'lib/rserve/connection.rb', line 21 def hostname @hostname end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
28 29 30 |
# File 'lib/rserve/connection.rb', line 28 def key @key end |
#last_error ⇒ Object (readonly)
Returns the value of attribute last_error.
24 25 26 |
# File 'lib/rserve/connection.rb', line 24 def last_error @last_error end |
#persistent=(value) ⇒ Object (writeonly)
Sets the attribute persistent
35 36 37 |
# File 'lib/rserve/connection.rb', line 35 def persistent=(value) @persistent = value end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
31 32 33 |
# File 'lib/rserve/connection.rb', line 31 def port @port end |
#port_number ⇒ Object (readonly)
Returns the value of attribute port_number.
22 23 24 |
# File 'lib/rserve/connection.rb', line 22 def port_number @port_number end |
#protocol ⇒ Object (readonly)
Returns the value of attribute protocol.
23 24 25 |
# File 'lib/rserve/connection.rb', line 23 def protocol @protocol end |
#rsrv_version ⇒ Object (readonly)
Returns the value of attribute rsrv_version.
34 35 36 |
# File 'lib/rserve/connection.rb', line 34 def rsrv_version @rsrv_version end |
#rt ⇒ Object (readonly)
Returns the value of attribute rt.
29 30 31 |
# File 'lib/rserve/connection.rb', line 29 def rt @rt end |
#s ⇒ Object (readonly)
Returns the value of attribute s.
30 31 32 |
# File 'lib/rserve/connection.rb', line 30 def s @s end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
32 33 34 |
# File 'lib/rserve/connection.rb', line 32 def session @session end |
#transfer_charset=(value) ⇒ Object (writeonly)
Sets the attribute transfer_charset
33 34 35 |
# File 'lib/rserve/connection.rb', line 33 def transfer_charset=(value) @transfer_charset = value end |
Instance Method Details
#assign(sym, ct) ⇒ Object
assign a string value to a symbol in R. The symbol is created if it doesn’t exist already.
237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/rserve/connection.rb', line 237 def assign(sym, ct) raise NotConnectedError if !connected? or rt.nil? case ct when String assign_string(sym,ct) when REXP assign_rexp(sym,ct) else assign_rexp(sym, Rserve::REXP::Wrapper.wrap(ct)) end end |
#assign_rexp(sym, rexp) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/rserve/connection.rb', line 278 def assign_rexp(sym, rexp) r = REXPFactory.new(rexp); rl=r.get_binary_length(); symn=sym.unpack("C*"); sl=symn.length+1; sl=(sl&0xfffffc)+4 if ((sl&3)>0) # make sure the symbol length is divisible by 4 rq=Array.new(sl+rl+((rl>0xfffff0) ? 12 : 8)); symn.length.times {|i| rq[i+4]=symn[i]} ic=symn.length while(ic<sl) rq[ic+4]=0; ic+=1; end # pad with 0 set_hdr(Rserve::Protocol::DT_STRING,sl,rq,0) set_hdr(Rserve::Protocol::DT_SEXP,rl,rq,sl+4); r.get_binary_representation(rq, sl+((rl>0xfffff0) ? 12 : 8)); # puts "ASSIGN RQ: #{rq}" if $DEBUG rp=rt.request(:cmd=>Rserve::Protocol::CMD_setSEXP, :cont=>rq) if (!rp.nil? and rp.ok?) rp else raise "Assign Failed" end end |
#assign_string(sym, ct) ⇒ Object
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/rserve/connection.rb', line 249 def assign_string(sym,ct) symn = sym.unpack("C*") ctn = ct.unpack("C*") sl=symn.length+1 cl=ctn.length+1 sl=(sl&0xfffffc)+4 if ((sl&3)>0) # make sure the symbol length is divisible by 4 cl=(cl&0xfffffc)+4 if ((cl&3)>0) # make sure the content length is divisible by 4 rq=Array.new(sl+4+cl+4) symn.length.times {|i| rq[i+4]=symn[i]} ic=symn.length while (ic<sl) rq[ic+4]=0 ic+=1 end ctn.length.times {|i| rq[i+sl+8]=ctn[i]} ic=ctn.length while (ic<cl) rq[ic+sl+8]=0 ic+=1 end set_hdr(Rserve::Protocol::DT_STRING,sl,rq,0) set_hdr(Rserve::Protocol::DT_STRING,cl,rq,sl+4) rp=rt.request(:cmd=>Rserve::Protocol::CMD_setSEXP,:cont=>rq) if (!rp.nil? and rp.ok?) rp else raise "Assign Failed" end end |
#close ⇒ Object
Closes current connection
147 148 149 150 151 152 153 154 155 156 |
# File 'lib/rserve/connection.rb', line 147 def close if !@s.nil? and !@s.closed? @s.close_write @s.close_read end raise "Can't close socket" unless @s.closed? @connected=false @@connected_object=nil true end |
#connect ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/rserve/connection.rb', line 95 def connect # On windows, Rserve doesn't allows concurrent connections. # So, we must close the last open connection first if ON_WINDOWS and !@@connected_object.nil? @@connected_object.close end close if @connected @s = TCPSocket::new(@hostname, @port_number) @rt=Rserve::Talk.new(@s) if @session.nil? #puts "Connected" # Accept first input input=@s.recv(32).unpack("a4a4a4a4a4a4a4a4") raise IncorrectServerError, "Handshake failed: Rsrv signature expected, but received [#{input[0]}]" unless input[0]=="Rsrv" @rsrv_version=input[1].to_i raise IncorrectServerVersionError, "Handshake failed: The server uses more recent protocol than this client." if @rsrv_version>103 @protocol=input[2] raise IncorrectProtocolError, "Handshake failed: unsupported transfer protocol #{@protocol}, I talk only QAP1." if @protocol!="QAP1" (3..7).each do |i| attr=input[i] if (attr=="ARpt") if (!auth_req) # this method is only fallback when no other was specified auth_req=true auth_type=AT_plain end end if (attr=="ARuc") auth_req=true authType=AT_crypt end if (attr[0]=='K') key=attr[1,3] end end else # we have a session to take care of @s.write(@session.key.pack("C*")) @rsrv_version=session.rsrv_version end @connected=true @@connected_object=self @last_error="OK" end |
#connected? ⇒ Boolean
Check connection state. Note that currently this state is not checked on-the-spot, that is if connection went down by an outside event this is not reflected by the flag. return true
if this connection is alive
142 143 144 |
# File 'lib/rserve/connection.rb', line 142 def connected? @connected end |
#detach ⇒ Object
Detaches the session and closes the connection (requires Rserve 0.4+). The session can be only resumed by calling RSession.attach
319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/rserve/connection.rb', line 319 def detach raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_detachSession) if !rp.nil? and rp.ok? s=Rserve::Session.new(self,rp) close s else raise "Cannot detach" end end |
#eval(cmd) ⇒ Object
evaluates the given command and retrieves the result
-
@param cmd command/expression string
-
@return R-xpression or
null
if an error occured */
197 198 199 200 201 202 203 204 205 |
# File 'lib/rserve/connection.rb', line 197 def eval(cmd) raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_eval, :cont=>cmd+"\n") if !rp.nil? and rp.ok? parse_eval_response(rp) else raise EvalError.new(rp), "eval failed: #{rp.to_s}" end end |
#get_server_version ⇒ Object
Get server version as reported during the handshake.
158 159 160 |
# File 'lib/rserve/connection.rb', line 158 def get_server_version @rsrv_version end |
#parse_eval_response(rp) ⇒ Object
NOT TESTED
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/rserve/connection.rb', line 208 def parse_eval_response(rp) rxo=0 pc=rp.cont if (rsrv_version>100) # /* since 0101 eval responds correctly by using DT_SEXP type/len header which is 4 bytes long */ rxo=4 # we should check parameter type (should be DT_SEXP) and fail if it's not if pc.nil? raise "Error while processing eval output: SEXP (type #{Rserve::Protocol::DT_SEXP}) expected but nil returned" elsif (pc[0]!=Rserve::Protocol::DT_SEXP and pc[0]!=(Rserve::Protocol::DT_SEXP|Rserve::Protocol::DT_LARGE)) raise "Error while processing eval output: SEXP (type #{Rserve::Protocol::DT_SEXP}) expected but found result type "+pc[0].to_s+"." end if (pc[0]==(Rserve::Protocol::DT_SEXP|Rserve::Protocol::DT_LARGE)) rxo=8; # large data need skip of 8 bytes end # warning: we are not checking or using the length - we assume that only the one SEXP is returned. This is true for the current CMD_eval implementation, but may not be in the future. */ end if pc.length>rxo rx=REXPFactory.new; rx.parse_REXP(pc, rxo); return rx.get_REXP(); else return nil end end |
#shutdown ⇒ Object
Shutdown remote Rserve. Note that some Rserves cannot be shut down from the client side
307 308 309 310 311 312 313 314 315 |
# File 'lib/rserve/connection.rb', line 307 def shutdown raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_shutdown) if !rp.nil? and rp.ok? true else raise "Shutdown failed" end end |
#void_eval(cmd) ⇒ Object
evaluates the given command, but does not fetch the result (useful for assignment operations)
-
@param cmd command/expression string */
164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rserve/connection.rb', line 164 def void_eval(cmd) raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_voidEval, :cont=>cmd+"\n") if !rp.nil? and rp.ok? true else raise EvalError.new(rp), "voidEval failed: #{rp.to_s}" end end |
#void_eval_detach(cmd) ⇒ Object
-
@param cmd command/expression string.
-
@return session object that can be use to attach back to the session once the command completed
180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/rserve/connection.rb', line 180 def void_eval_detach(cmd) raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_detachedVoidEval,:cont=>cmd+"\n") if rp.nil? or !rp.ok? raise EvalError.new(rp), "detached void eval failed : #{rp.to_s}" else s=Rserve::Session.new(self,rp) close s end end |