Class: Krakow::Producer
- Inherits:
-
Object
- Object
- Krakow::Producer
- Includes:
- Celluloid, Utils::Lazy
- Defined in:
- lib/krakow/producer.rb,
lib/krakow/producer/http.rb
Overview
TCP based producer
Defined Under Namespace
Classes: Http
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Attributes collapse
-
#connection_options ⇒ Hash
The connection_options attribute.
-
#connection_options? ⇒ TrueClass, FalseClass
Truthiness of the connection_options attribute.
-
#host ⇒ String
The host attribute.
-
#host? ⇒ TrueClass, FalseClass
Truthiness of the host attribute.
-
#port ⇒ [String, Integer]
The port attribute.
-
#port? ⇒ TrueClass, FalseClass
Truthiness of the port attribute.
-
#reconnect_interval ⇒ Integer
The reconnect_interval attribute.
-
#reconnect_interval? ⇒ TrueClass, FalseClass
Truthiness of the reconnect_interval attribute.
-
#reconnect_retries ⇒ Integer
The reconnect_retries attribute.
-
#reconnect_retries? ⇒ TrueClass, FalseClass
Truthiness of the reconnect_retries attribute.
-
#topic ⇒ String
The topic attribute.
-
#topic? ⇒ TrueClass, FalseClass
Truthiness of the topic attribute.
Instance Method Summary collapse
-
#connect ⇒ Object
Establish connection to configured ‘host` and `port`.
-
#connected? ⇒ TrueClass, FalseClass
Currently connected to server.
-
#connection_failure(*args) ⇒ TrueClass
Process connection failure and attempt reconnection.
-
#goodbye_my_love! ⇒ Object
Instance destructor.
-
#initialize(args = {}) ⇒ Producer
constructor
A new instance of Producer.
-
#to_s ⇒ String
Stringify object.
-
#write(*message) ⇒ Krakow::FrameType::Error?
Write message to server.
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
#initialize(args = {}) ⇒ Producer
Returns a new instance of Producer.
40 41 42 43 44 45 46 |
# File 'lib/krakow/producer.rb', line 40 def initialize(args={}) super arguments[:connection_options] = {:features => {}, :config => {}, :options => {}}.merge( arguments.fetch(:connection_options, {}) ) connect end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
22 23 24 |
# File 'lib/krakow/producer.rb', line 22 def connection @connection end |
Instance Method Details
#connect ⇒ Object
Establish connection to configured ‘host` and `port`
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/krakow/producer.rb', line 51 def connect info "Establishing connection to: #{host}:#{port}" begin con_args = [:options].dup.tap do |args| args[:host] = host args[:port] = port if([:features]) args[:features] = [:features] end if([:config]) args[:features_args] = [:config] end end @connection = Connection.new(con_args) connection.init! self.link connection info "Connection established: #{connection}" nil rescue => e abort e end end |
#connected? ⇒ TrueClass, FalseClass
Returns currently connected to server.
80 81 82 |
# File 'lib/krakow/producer.rb', line 80 def connected? !!(connection && connection.alive? && connection.connected?) end |
#connection_failure(*args) ⇒ TrueClass
Process connection failure and attempt reconnection
87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/krakow/producer.rb', line 87 def connection_failure(*args) @connection = nil begin warn "Connection failure detected for #{host}:#{port}" connect rescue => e warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry" sleep reconnect_interval connect end true end |
#connection_options ⇒ Hash
Returns the connection_options attribute.
36 |
# File 'lib/krakow/producer.rb', line 36 attribute :connection_options, Hash, :default => ->{ Hash.new } |
#connection_options? ⇒ TrueClass, FalseClass
Returns truthiness of the connection_options attribute.
36 |
# File 'lib/krakow/producer.rb', line 36 attribute :connection_options, Hash, :default => ->{ Hash.new } |
#goodbye_my_love! ⇒ Object
Instance destructor
102 103 104 105 106 107 108 109 110 |
# File 'lib/krakow/producer.rb', line 102 def goodbye_my_love! debug 'Tearing down producer' if(connection && connection.alive?) connection.terminate end @connection = nil info 'Producer torn down' nil end |
#host ⇒ String
Returns the host attribute.
31 |
# File 'lib/krakow/producer.rb', line 31 attribute :host, String, :required => true |
#host? ⇒ TrueClass, FalseClass
Returns truthiness of the host attribute.
31 |
# File 'lib/krakow/producer.rb', line 31 attribute :host, String, :required => true |
#port ⇒ [String, Integer]
Returns the port attribute.
32 |
# File 'lib/krakow/producer.rb', line 32 attribute :port, [String, Integer], :required => true |
#port? ⇒ TrueClass, FalseClass
Returns truthiness of the port attribute.
32 |
# File 'lib/krakow/producer.rb', line 32 attribute :port, [String, Integer], :required => true |
#reconnect_interval ⇒ Integer
Returns the reconnect_interval attribute.
35 |
# File 'lib/krakow/producer.rb', line 35 attribute :reconnect_interval, Integer, :default => 5 |
#reconnect_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the reconnect_interval attribute.
35 |
# File 'lib/krakow/producer.rb', line 35 attribute :reconnect_interval, Integer, :default => 5 |
#reconnect_retries ⇒ Integer
Returns the reconnect_retries attribute.
34 |
# File 'lib/krakow/producer.rb', line 34 attribute :reconnect_retries, Integer, :default => 10 |
#reconnect_retries? ⇒ TrueClass, FalseClass
Returns truthiness of the reconnect_retries attribute.
34 |
# File 'lib/krakow/producer.rb', line 34 attribute :reconnect_retries, Integer, :default => 10 |
#to_s ⇒ String
Returns stringify object.
75 76 77 |
# File 'lib/krakow/producer.rb', line 75 def to_s "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>" end |
#topic ⇒ String
Returns the topic attribute.
33 |
# File 'lib/krakow/producer.rb', line 33 attribute :topic, String, :required => true |
#topic? ⇒ TrueClass, FalseClass
Returns truthiness of the topic attribute.
33 |
# File 'lib/krakow/producer.rb', line 33 attribute :topic, String, :required => true |
#write(*message) ⇒ Krakow::FrameType::Error?
Write message to server
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/krakow/producer.rb', line 117 def write(*) if(.empty?) abort ArgumentError.new 'Expecting one or more messages to send. None provided.' end if(connection && connection.alive?) if(.size > 1) debug 'Multiple message publish' connection.transmit( Command::Mpub.new( :topic_name => topic, :messages => ) ) else debug 'Single message publish' connection.transmit( Command::Pub.new( :message => .first, :topic_name => topic ) ) end else abort Error::ConnectionUnavailable.new 'Remote connection is unavailable!' end end |