Class: Beaneater::Connection
- Inherits:
-
Object
- Object
- Beaneater::Connection
- Defined in:
- lib/beaneater/connection.rb
Overview
Represents a connection to a beanstalkd instance.
Constant Summary collapse
- MAX_RETRIES =
Default number of retries to send a command to a connection
3- DEFAULT_RETRY_INTERVAL =
Default retry interval
1- DEFAULT_PORT =
Default port value for beanstalk connection
11300
Instance Attribute Summary collapse
-
#address ⇒ String
Returns Beanstalkd server address.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#host ⇒ String
Returns Beanstalkd server host.
-
#port ⇒ Integer
Returns Beanstalkd server port.
- #tube_used ⇒ Object
- #tubes_watched ⇒ Object
Instance Method Summary collapse
- #add_to_watched(tube_name) ⇒ Object
-
#close ⇒ Object
Close connection with beanstalkd server.
-
#config ⇒ Beaneater::Configuration
protected
Returns configuration options for beaneater.
-
#establish_connection ⇒ Net::TCPSocket
protected
Establish a connection based on beanstalk address.
-
#initialize(address) ⇒ Connection
constructor
Initializes new connection.
-
#parse_response(cmd, res) ⇒ Array<Hash{String => String, Number}>
protected
Parses the response and returns the useful beanstalk response.
- #remove_from_watched(tube_name) ⇒ Object
-
#to_s ⇒ Object
(also: #inspect)
Returns string representation of job.
-
#transmit(command, **options) ⇒ Array<Hash{String => String, Number}>
Send commands to beanstalkd server via connection.
Constructor Details
#initialize(address) ⇒ Connection
Initializes new connection.
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/beaneater/connection.rb', line 53 def initialize(address) @address = address || _host_from_env || Beaneater.configuration.beanstalkd_url @mutex = Mutex.new @tube_used = 'default' @tubes_watched = ['default'] establish_connection rescue _raise_not_connected! end |
Instance Attribute Details
#address ⇒ String
Returns Beanstalkd server address
30 31 32 |
# File 'lib/beaneater/connection.rb', line 30 def address @address end |
#connection ⇒ Object
Returns the value of attribute connection.
30 |
# File 'lib/beaneater/connection.rb', line 30 attr_reader :address, :host, :port, :connection |
#host ⇒ String
Returns Beanstalkd server host
30 |
# File 'lib/beaneater/connection.rb', line 30 attr_reader :address, :host, :port, :connection |
#port ⇒ Integer
Returns Beanstalkd server port
30 |
# File 'lib/beaneater/connection.rb', line 30 attr_reader :address, :host, :port, :connection |
#tube_used ⇒ Object
36 |
# File 'lib/beaneater/connection.rb', line 36 attr_accessor :tubes_watched, :tube_used |
#tubes_watched ⇒ Object
36 37 38 |
# File 'lib/beaneater/connection.rb', line 36 def tubes_watched @tubes_watched end |
Instance Method Details
#add_to_watched(tube_name) ⇒ Object
109 110 111 112 |
# File 'lib/beaneater/connection.rb', line 109 def add_to_watched(tube_name) @tubes_watched << tube_name @tubes_watched.uniq end |
#close ⇒ Object
Close connection with beanstalkd server.
92 93 94 95 96 97 |
# File 'lib/beaneater/connection.rb', line 92 def close if @connection @connection.close @connection = nil end end |
#config ⇒ Beaneater::Configuration (protected)
Returns configuration options for beaneater
174 175 176 |
# File 'lib/beaneater/connection.rb', line 174 def config Beaneater.configuration end |
#establish_connection ⇒ Net::TCPSocket (protected)
Establish a connection based on beanstalk address.
127 128 129 130 131 132 133 |
# File 'lib/beaneater/connection.rb', line 127 def establish_connection @address = address.first if address.is_a?(Array) match = address.split(':') @host, @port = match[0], Integer(match[1] || DEFAULT_PORT) @connection = TCPSocket.new @host, @port end |
#parse_response(cmd, res) ⇒ Array<Hash{String => String, Number}> (protected)
Parses the response and returns the useful beanstalk response. Will read the body if one is indicated by the status.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/beaneater/connection.rb', line 146 def parse_response(cmd, res) status = res.chomp body_values = status.split(/\s/) status = body_values[0] raise UnexpectedResponse.from_status(status, cmd) if UnexpectedResponse::ERROR_STATES.include?(status) body = nil if ['OK','FOUND', 'RESERVED'].include?(status) bytes_size = body_values[-1].to_i raw_body = connection.read(bytes_size) body = if status == 'OK' psych_v4_valid_body = raw_body.gsub(/^(.*?): (.*)$/) { "#{$1}: #{$2.gsub(/[\:\-\~]/, '_')}" } YAML.load(psych_v4_valid_body) else config.job_parser.call(raw_body) end crlf = connection.read(2) # \r\n raise ExpectedCrlfError.new('EXPECTED_CRLF', cmd) if crlf != "\r\n" end id = body_values[1] response = { :status => status } response[:id] = id if id response[:body] = body if body response end |
#remove_from_watched(tube_name) ⇒ Object
114 115 116 |
# File 'lib/beaneater/connection.rb', line 114 def remove_from_watched(tube_name) @tubes_watched.delete(tube_name) end |
#to_s ⇒ Object Also known as: inspect
Returns string representation of job.
104 105 106 |
# File 'lib/beaneater/connection.rb', line 104 def to_s "#<Beaneater::Connection host=#{host.inspect} port=#{port.inspect}>" end |
#transmit(command, **options) ⇒ Array<Hash{String => String, Number}>
Send commands to beanstalkd server via connection.
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/beaneater/connection.rb', line 74 def transmit(command, **) _with_retry(**.slice(:retry_interval, :init)) do @mutex.synchronize do _raise_not_connected! unless connection command = command.dup.force_encoding('ASCII-8BIT') if command.respond_to?(:force_encoding) connection.write(command.to_s + "\r\n") res = connection.readline parse_response(command, res) end end end |