Class: EM::Mongo::EMConnection
- Inherits:
-
Connection
- Object
- Connection
- EM::Mongo::EMConnection
- Includes:
- Deferrable
- Defined in:
- lib/em-mongo/connection.rb
Defined Under Namespace
Classes: Error
Constant Summary
- MAX_RETRIES =
5- RESERVED =
0- STANDARD_HEADER_SIZE =
16- RESPONSE_HEADER_SIZE =
20
Instance Attribute Summary (collapse)
-
- (Object) connection
readonly
Returns the value of attribute connection.
Class Method Summary (collapse)
Instance Method Summary (collapse)
-
- (Object) build_last_error_message(message, db_name, opts)
Connection#send_message_with_safe_check.
- - (Object) close
- - (Boolean) connected?
- - (Object) connection_completed
-
- (EMConnection) initialize(options = {})
constructor
EM hooks.
- - (Object) message_headers(operation, request_id, message)
- - (Boolean) message_received?(buffer)
- - (Object) new_request_id
- - (Object) next_response
- - (Object) peek_size(buffer)
-
- (Object) prepare_message(op, message, options = {})
MongoDB Commands.
- - (Object) prepare_safe_message(message, options)
- - (Object) receive_data(data)
- - (Object) remaining_bytes(buffer)
- - (Boolean) responses_pending?
- - (Object) send_command(op, message, options = {}, &cb)
- - (Boolean) slave_ok?
- - (Object) unbind
Constructor Details
- (EMConnection) initialize(options = {})
EM hooks
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/em-mongo/connection.rb', line 109 def initialize(={}) @request_id = 0 @retries = 0 @responses = {} @is_connected = false @host = [:host] || DEFAULT_IP @port = [:port] || DEFAULT_PORT @on_unbind = [:unbind_cb] || proc {} @reconnect_in = [:reconnect_in]|| false @slave_ok = [:slave_ok] || false @on_close = proc { raise Error, "failure with mongodb server #{@host}:#{@port}" } timeout [:timeout] if [:timeout] errback { @on_close.call } end |
Instance Attribute Details
- (Object) connection (readonly)
Returns the value of attribute connection
45 46 47 |
# File 'lib/em-mongo/connection.rb', line 45 def connection @connection end |
Class Method Details
+ (Object) connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil)
127 128 129 130 |
# File 'lib/em-mongo/connection.rb', line 127 def self.connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil) opt = {:host => host, :port => port, :timeout => timeout, :reconnect_in => false}.merge(opts) EM.connect(host, port, self, opt) end |
Instance Method Details
- (Object) build_last_error_message(message, db_name, opts)
Connection#send_message_with_safe_check.
Because it modifies message by reference, we don't need to return it.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/em-mongo/connection.rb', line 218 def (, db_name, opts) .put_int(0) BSON::BSON_RUBY.serialize_cstr(, "#{db_name}.$cmd") .put_int(0) .put_int(-1) cmd = BSON::OrderedHash.new cmd[:getlasterror] = 1 if opts.is_a?(Hash) opts.assert_valid_keys(:w, :wtimeout, :fsync) cmd.merge!(opts) end .put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s) nil end |
- (Object) close
205 206 207 208 209 210 211 212 |
# File 'lib/em-mongo/connection.rb', line 205 def close @on_close = proc { yield if block_given? } if @responses.empty? close_connection_after_writing else @close_pending = true end end |
- (Boolean) connected?
51 52 53 |
# File 'lib/em-mongo/connection.rb', line 51 def connected? @is_connected end |
- (Object) connection_completed
132 133 134 135 136 137 |
# File 'lib/em-mongo/connection.rb', line 132 def connection_completed @buffer = BSON::ByteBuffer.new @is_connected = true @retries = 0 succeed end |
- (Object) message_headers(operation, request_id, message)
88 89 90 91 92 93 94 95 |
# File 'lib/em-mongo/connection.rb', line 88 def (operation, request_id, ) headers = BSON::ByteBuffer.new headers.put_int(16 + .size) headers.put_int(request_id) headers.put_int(0) headers.put_int(operation) headers end |
- (Boolean) message_received?(buffer)
139 140 141 142 |
# File 'lib/em-mongo/connection.rb', line 139 def (buffer) x= remaining_bytes(@buffer) x > STANDARD_HEADER_SIZE && x >= peek_size(@buffer) end |
- (Object) new_request_id
55 56 57 |
# File 'lib/em-mongo/connection.rb', line 55 def new_request_id @request_id += 1 end |
- (Object) next_response
178 179 180 |
# File 'lib/em-mongo/connection.rb', line 178 def next_response() ServerResponse.new(@buffer, self) end |
- (Object) peek_size(buffer)
148 149 150 151 152 153 |
# File 'lib/em-mongo/connection.rb', line 148 def peek_size(buffer) position= buffer.position size= buffer.get_int buffer.position= position size end |
- (Object) prepare_message(op, message, options = {})
MongoDB Commands
65 66 67 68 69 70 |
# File 'lib/em-mongo/connection.rb', line 65 def (op, , ={}) req_id = new_request_id .prepend!((op, req_id, )) req_id = (,) if [:safe] [req_id, .to_s] end |
- (Object) prepare_safe_message(message, options)
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/em-mongo/connection.rb', line 72 def (,) db_name = [:db_name] unless db_name raise( ArgumentError, "You must include the :db_name option when :safe => true" ) end last_error_params = [:last_error_params] || false = BSON::ByteBuffer.new (, db_name, last_error_params) last_error_id = new_request_id .prepend!((EM::Mongo::OP_QUERY, last_error_id, )) .append!() last_error_id end |
- (Object) receive_data(data)
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/em-mongo/connection.rb', line 155 def receive_data(data) @buffer.append!(BSON::ByteBuffer.new(data.unpack('C*'))) @buffer.rewind while (@buffer) response = next_response callback = @responses.delete(response.response_to) callback.call(response) if callback end if @buffer.more? remaining_bytes= @buffer.size-@buffer.position @buffer = BSON::ByteBuffer.new(@buffer.get(remaining_bytes)) @buffer.rewind else @buffer.clear end close_connection if @close_pending && @responses.empty? end |
- (Object) remaining_bytes(buffer)
144 145 146 |
# File 'lib/em-mongo/connection.rb', line 144 def remaining_bytes(buffer) buffer.size-buffer.position end |
- (Boolean) responses_pending?
47 48 49 |
# File 'lib/em-mongo/connection.rb', line 47 def responses_pending? @responses.size >= 1 end |
- (Object) send_command(op, message, options = {}, &cb)
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/em-mongo/connection.rb', line 97 def send_command(op, , ={}, &cb) request_id, buffer = (op, , ) callback do send_data buffer end @responses[request_id] = cb if cb request_id end |
- (Boolean) slave_ok?
59 60 61 |
# File 'lib/em-mongo/connection.rb', line 59 def slave_ok? @slave_ok end |
- (Object) unbind
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/em-mongo/connection.rb', line 182 def unbind if @is_connected @responses.values.each { |resp| resp.call(:disconnected) } @request_id = 0 @responses = {} end @is_connected = false set_deferred_status(nil) if @reconnect_in && @retries < MAX_RETRIES EM.add_timer(@reconnect_in) { reconnect(@host, @port) } elsif @on_unbind @on_unbind.call else raise "Connection to Mongo Lost" end @retries += 1 end |