Class: Beaneater::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/beaneater/connection.rb

Overview

Represents a connection to a beanstalkd instance.

Constant Summary collapse

DEFAULT_PORT =

Default port value for beanstalk connection

11300

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(address) ⇒ Connection

Initializes new connection.

Examples:

Beaneater::Connection.new('localhost')
Beaneater::Connection.new('localhost:11300')

Parameters:

  • address (String)

    beanstalkd instance address.


34
35
36
37
38
# File 'lib/beaneater/connection.rb', line 34

def initialize(address)
  @address = address
  @connection = establish_connection
  @mutex = Mutex.new
end

Instance Attribute Details

#addressString

Returns Beanstalkd server address

Examples:

@conn.address # => "localhost:11300"

Returns:

  • (String)

    returns Beanstalkd server address


22
23
24
# File 'lib/beaneater/connection.rb', line 22

def address
  @address
end

#connectionObject

Returns the value of attribute connection


22
# File 'lib/beaneater/connection.rb', line 22

attr_reader :address, :host, :port, :connection

#hostString

Returns Beanstalkd server host

Examples:

@conn.host # => "localhost"

Returns:

  • (String)

    returns Beanstalkd server host


22
# File 'lib/beaneater/connection.rb', line 22

attr_reader :address, :host, :port, :connection

#portInteger

Returns Beanstalkd server port

Examples:

@conn.port # => "11300"

Returns:

  • (Integer)

    returns Beanstalkd server port


22
# File 'lib/beaneater/connection.rb', line 22

attr_reader :address, :host, :port, :connection

Instance Method Details

#closeObject

Close connection with beanstalkd server.

Examples:

@conn.close

67
68
69
70
# File 'lib/beaneater/connection.rb', line 67

def close
  @connection.close
  @connection = nil
end

#configBeaneater::Configuration (protected)

Returns configuration options for beaneater

Returns:


136
137
138
# File 'lib/beaneater/connection.rb', line 136

def config
  Beaneater.configuration
end

#establish_connectionNet::TCPSocket (protected)

Establish a connection based on beanstalk address.

Examples:

establish_connection('localhost:3005')

Returns:

  • (Net::TCPSocket)

    connection for specified address.

Raises:


91
92
93
94
95
96
97
98
99
# File 'lib/beaneater/connection.rb', line 91

def establish_connection
  @match = address.split(':')
  @host, @port = @match[0], Integer(@match[1] || DEFAULT_PORT)
  TCPSocket.new @host, @port
rescue Errno::ECONNREFUSED
  raise NotConnected, "Could not connect to '#{@host}:#{@port}'"
rescue Exception => ex
  raise NotConnected, "#{ex.class}: #{ex}"
end

#parse_response(cmd, res) ⇒ Array<Hash{String => String, Number}> (protected)

Parses the response and returns the useful beanstalk response. Will read the body if one is indicated by the status.

Examples:

parse_response("delete 56", "DELETED 56\nFOO")
 # => { :body => "FOO", :status => "DELETED", :id => 56, :connection => <Connection>  }

Parameters:

  • cmd (String)

    Beanstalk command transmitted

  • res (String)

    Telnet command response

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalk response with status, id, body, and connection

Raises:


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/beaneater/connection.rb', line 112

def parse_response(cmd, res)
  status = res.chomp
  body_values = status.split(/\s/)
  status = body_values[0]
  raise UnexpectedResponse.from_status(status, cmd) if UnexpectedResponse::ERROR_STATES.include?(status)
  body = nil
  if ['OK','FOUND', 'RESERVED'].include?(status)
    bytes_size = body_values[-1].to_i
    raw_body = connection.read(bytes_size)
    body = status == 'OK' ? YAML.load(raw_body) : config.job_parser.call(raw_body)
    crlf = connection.read(2) # \r\n
    raise ExpectedCrlfError.new('EXPECTED_CRLF', cmd) if crlf != "\r\n"
  end
  id = body_values[1]
  response = { :status => status }
  response[:id] = id if id
  response[:body] = body if body
  response[:connection] = self
  response
end

#raise_not_connected!Object (protected)

Raises an error to be triggered when the connection has failed

Raises:


142
143
144
# File 'lib/beaneater/connection.rb', line 142

def raise_not_connected!
  raise NotConnected, "Connection to beanstalk '#{@host}:#{@port}' is closed!"
end

#to_sObject Also known as: inspect

Returns string representation of job.

Examples:

@conn.inspect

77
78
79
# File 'lib/beaneater/connection.rb', line 77

def to_s
  "#<Beaneater::Connection host=#{host.inspect} port=#{port.inspect}>"
end

#transmit(command, options = {}) ⇒ Array<Hash{String => String, Number}>

Send commands to beanstalkd server via connection.

Examples:

@conn.transmit('bury 123')

Parameters:

  • ] (Hash{String => String, Number})

    options Retained for compatibility

  • command (String)

    Beanstalkd command

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response


48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/beaneater/connection.rb', line 48

def transmit(command, options={})
  @mutex.synchronize do
    if connection
      command = command.force_encoding('ASCII-8BIT') if command.respond_to?(:force_encoding)
      connection.write(command.to_s + "\r\n")
      res = connection.gets
      raise_not_connected! unless res
      parse_response(command, res)
    else # no connection
      raise_not_connected!
    end
  end
end