Class: EM::Mongo::EMConnection

Inherits:
Connection
  • Object
show all
Includes:
Deferrable
Defined in:
lib/em-mongo/connection.rb

Defined Under Namespace

Classes: Error

Constant Summary

MAX_RETRIES =
5
RESERVED =
0
STANDARD_HEADER_SIZE =
16
RESPONSE_HEADER_SIZE =
20

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (EMConnection) initialize(options = {})

EM hooks



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/em-mongo/connection.rb', line 109

def initialize(options={})
  @request_id    = 0
  @retries       = 0
  @responses     = {}
  @is_connected  = false
  @host          = options[:host]        || DEFAULT_IP
  @port          = options[:port]        || DEFAULT_PORT
  @on_unbind     = options[:unbind_cb]   || proc {}
  @reconnect_in  = options[:reconnect_in]|| false
  @slave_ok      = options[:slave_ok]    || false

  @on_close = proc {
    raise Error, "failure with mongodb server #{@host}:#{@port}"
  }
  timeout options[:timeout] if options[:timeout]
  errback { @on_close.call }
end

Instance Attribute Details

- (Object) connection (readonly)

Returns the value of attribute connection



45
46
47
# File 'lib/em-mongo/connection.rb', line 45

def connection
  @connection
end

Class Method Details

+ (Object) connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil)



127
128
129
130
# File 'lib/em-mongo/connection.rb', line 127

def self.connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil)
  opt = {:host => host, :port => port, :timeout => timeout, :reconnect_in => false}.merge(opts)
  EM.connect(host, port, self, opt)
end

Instance Method Details

- (Object) build_last_error_message(message, db_name, opts)

Connection#send_message_with_safe_check.

Because it modifies message by reference, we don't need to return it.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/em-mongo/connection.rb', line 218

def build_last_error_message(message, db_name, opts)
  message.put_int(0)
  BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd")
  message.put_int(0)
  message.put_int(-1)
  cmd = BSON::OrderedHash.new
  cmd[:getlasterror] = 1
  if opts.is_a?(Hash)
    opts.assert_valid_keys(:w, :wtimeout, :fsync)
    cmd.merge!(opts)
  end
  message.put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s)
  nil
end

- (Object) close



205
206
207
208
209
210
211
212
# File 'lib/em-mongo/connection.rb', line 205

def close
  @on_close = proc { yield if block_given? }
  if @responses.empty?
    close_connection_after_writing
  else
    @close_pending = true
  end
end

- (Boolean) connected?

Returns:

  • (Boolean)


51
52
53
# File 'lib/em-mongo/connection.rb', line 51

def connected?
  @is_connected
end

- (Object) connection_completed



132
133
134
135
136
137
# File 'lib/em-mongo/connection.rb', line 132

def connection_completed
  @buffer = BSON::ByteBuffer.new
  @is_connected = true
  @retries = 0
  succeed
end

- (Object) message_headers(operation, request_id, message)



88
89
90
91
92
93
94
95
# File 'lib/em-mongo/connection.rb', line 88

def message_headers(operation, request_id, message)
  headers = BSON::ByteBuffer.new
  headers.put_int(16 + message.size)
  headers.put_int(request_id)
  headers.put_int(0)
  headers.put_int(operation)
  headers
end

- (Boolean) message_received?(buffer)

Returns:

  • (Boolean)


139
140
141
142
# File 'lib/em-mongo/connection.rb', line 139

def message_received?(buffer)
  x= remaining_bytes(@buffer)
  x > STANDARD_HEADER_SIZE && x >= peek_size(@buffer)
end

- (Object) new_request_id



55
56
57
# File 'lib/em-mongo/connection.rb', line 55

def new_request_id
  @request_id += 1
end

- (Object) next_response



178
179
180
# File 'lib/em-mongo/connection.rb', line 178

def next_response()
  ServerResponse.new(@buffer, self)
end

- (Object) peek_size(buffer)



148
149
150
151
152
153
# File 'lib/em-mongo/connection.rb', line 148

def peek_size(buffer)
  position= buffer.position
  size= buffer.get_int
  buffer.position= position
  size
end

- (Object) prepare_message(op, message, options = {})

MongoDB Commands



65
66
67
68
69
70
# File 'lib/em-mongo/connection.rb', line 65

def prepare_message(op, message, options={})
  req_id = new_request_id
  message.prepend!(message_headers(op, req_id, message))
  req_id = prepare_safe_message(message,options) if options[:safe]
  [req_id, message.to_s]
end

- (Object) prepare_safe_message(message, options)



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/em-mongo/connection.rb', line 72

def prepare_safe_message(message,options)
    db_name = options[:db_name]
    unless db_name
      raise( ArgumentError, "You must include the :db_name option when :safe => true" )
    end

    last_error_params = options[:last_error_params] || false
    last_error_message = BSON::ByteBuffer.new

    build_last_error_message(last_error_message, db_name, last_error_params)
    last_error_id = new_request_id
    last_error_message.prepend!(message_headers(EM::Mongo::OP_QUERY, last_error_id, last_error_message))
    message.append!(last_error_message)
    last_error_id
end

- (Object) receive_data(data)



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/em-mongo/connection.rb', line 155

def receive_data(data)

  @buffer.append!(BSON::ByteBuffer.new(data.unpack('C*')))

  @buffer.rewind
  while message_received?(@buffer)
    response = next_response
    callback = @responses.delete(response.response_to)
    callback.call(response) if callback
  end

  if @buffer.more?
    remaining_bytes= @buffer.size-@buffer.position
    @buffer = BSON::ByteBuffer.new(@buffer.get(remaining_bytes))
    @buffer.rewind
  else
    @buffer.clear
  end

  close_connection if @close_pending && @responses.empty?

end

- (Object) remaining_bytes(buffer)



144
145
146
# File 'lib/em-mongo/connection.rb', line 144

def remaining_bytes(buffer)
  buffer.size-buffer.position
end

- (Boolean) responses_pending?

Returns:

  • (Boolean)


47
48
49
# File 'lib/em-mongo/connection.rb', line 47

def responses_pending?
  @responses.size >= 1
end

- (Object) send_command(op, message, options = {}, &cb)



97
98
99
100
101
102
103
104
105
106
# File 'lib/em-mongo/connection.rb', line 97

def send_command(op, message, options={}, &cb)
  request_id, buffer = prepare_message(op, message, options)

  callback do
    send_data buffer
  end

  @responses[request_id] = cb if cb
  request_id
end

- (Boolean) slave_ok?

Returns:

  • (Boolean)


59
60
61
# File 'lib/em-mongo/connection.rb', line 59

def slave_ok?
  @slave_ok
end

- (Object) unbind



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/em-mongo/connection.rb', line 182

def unbind
  if @is_connected
    @responses.values.each { |resp| resp.call(:disconnected) }

    @request_id = 0
    @responses = {}
  end

  @is_connected = false

  set_deferred_status(nil)

  if @reconnect_in && @retries < MAX_RETRIES
    EM.add_timer(@reconnect_in) { reconnect(@host, @port) }
  elsif @on_unbind
    @on_unbind.call
  else
    raise "Connection to Mongo Lost"
  end

  @retries += 1
end