Class: Krakow::Producer
- Inherits:
-
Object
- Object
- Krakow::Producer
- Includes:
- Celluloid, Utils::Lazy
- Defined in:
- lib/krakow/producer.rb,
lib/krakow/producer/http.rb
Overview
TCP based producer
Defined Under Namespace
Classes: Http
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#notifier ⇒ Object
readonly
Returns the value of attribute notifier.
Attributes collapse
-
#connection_options ⇒ Hash
The connection_options attribute.
-
#connection_options? ⇒ TrueClass, FalseClass
Truthiness of the connection_options attribute.
-
#host ⇒ String
The host attribute.
-
#host? ⇒ TrueClass, FalseClass
Truthiness of the host attribute.
-
#port ⇒ [String, Integer]
The port attribute.
-
#port? ⇒ TrueClass, FalseClass
Truthiness of the port attribute.
-
#reconnect_interval ⇒ Integer
The reconnect_interval attribute.
-
#reconnect_interval? ⇒ TrueClass, FalseClass
Truthiness of the reconnect_interval attribute.
-
#reconnect_retries ⇒ Integer
The reconnect_retries attribute.
-
#reconnect_retries? ⇒ TrueClass, FalseClass
Truthiness of the reconnect_retries attribute.
-
#topic ⇒ String
The topic attribute.
-
#topic? ⇒ TrueClass, FalseClass
Truthiness of the topic attribute.
Instance Method Summary collapse
-
#connect ⇒ Object
Establish connection to configured ‘host` and `port`.
-
#connected? ⇒ TrueClass, FalseClass
Currently connected to server.
-
#connection_failure(obj, reason) ⇒ TrueClass
Process connection failure and attempt reconnection.
-
#initialize(args = {}) ⇒ Producer
constructor
A new instance of Producer.
-
#producer_cleanup ⇒ Object
Instance destructor.
-
#to_s ⇒ String
Stringify object.
-
#write(*message) ⇒ Krakow::FrameType, TrueClass
Write message to server.
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
#initialize(args = {}) ⇒ Producer
Returns a new instance of Producer.
41 42 43 44 45 46 47 |
# File 'lib/krakow/producer.rb', line 41 def initialize(args={}) super arguments[:connection_options] = {:features => {}, :config => {}, :options => {}}.merge( arguments.fetch(:connection_options, {}) ) connect end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
22 23 24 |
# File 'lib/krakow/producer.rb', line 22 def connection @connection end |
#notifier ⇒ Object (readonly)
Returns the value of attribute notifier.
23 24 25 |
# File 'lib/krakow/producer.rb', line 23 def notifier @notifier end |
Instance Method Details
#connect ⇒ Object
Establish connection to configured ‘host` and `port`
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/krakow/producer.rb', line 52 def connect @connecting = true info "Establishing connection to: #{host}:#{port}" begin con_args = [:options].dup.tap do |args| args[:host] = host args[:port] = port if([:features]) args[:features] = [:features] end if([:config]) args[:features_args] = [:config] end end @connection = Connection.new(con_args) @connection.init! self.link @connection info "Connection established: #{@connection}" nil rescue => e abort e end @connecting = false end |
#connected? ⇒ TrueClass, FalseClass
Returns currently connected to server.
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/krakow/producer.rb', line 83 def connected? begin !!(!@connecting && connection && connection.alive? && connection.connected?) rescue Celluloid::DeadActorError false end end |
#connection_failure(obj, reason) ⇒ TrueClass
Process connection failure and attempt reconnection
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/krakow/producer.rb', line 97 def connection_failure(obj, reason) if(obj == connection && !reason.nil?) begin @connection = nil warn "Connection failure detected for #{host}:#{port} - #{reason}" obj.terminate if obj.alive? connect rescue => reason warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry" sleep reconnect_interval retry end end true end |
#connection_options ⇒ Hash
Returns the connection_options attribute.
37 |
# File 'lib/krakow/producer.rb', line 37 attribute :connection_options, Hash, :default => ->{ Hash.new } |
#connection_options? ⇒ TrueClass, FalseClass
Returns truthiness of the connection_options attribute.
37 |
# File 'lib/krakow/producer.rb', line 37 attribute :connection_options, Hash, :default => ->{ Hash.new } |
#host ⇒ String
Returns the host attribute.
32 |
# File 'lib/krakow/producer.rb', line 32 attribute :host, String, :required => true |
#host? ⇒ TrueClass, FalseClass
Returns truthiness of the host attribute.
32 |
# File 'lib/krakow/producer.rb', line 32 attribute :host, String, :required => true |
#port ⇒ [String, Integer]
Returns the port attribute.
33 |
# File 'lib/krakow/producer.rb', line 33 attribute :port, [String, Integer], :required => true |
#port? ⇒ TrueClass, FalseClass
Returns truthiness of the port attribute.
33 |
# File 'lib/krakow/producer.rb', line 33 attribute :port, [String, Integer], :required => true |
#producer_cleanup ⇒ Object
Instance destructor
115 116 117 118 119 120 121 122 123 |
# File 'lib/krakow/producer.rb', line 115 def producer_cleanup debug 'Tearing down producer' if(connection && connection.alive?) connection.terminate end @connection = nil info 'Producer torn down' nil end |
#reconnect_interval ⇒ Integer
Returns the reconnect_interval attribute.
36 |
# File 'lib/krakow/producer.rb', line 36 attribute :reconnect_interval, Integer, :default => 5 |
#reconnect_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the reconnect_interval attribute.
36 |
# File 'lib/krakow/producer.rb', line 36 attribute :reconnect_interval, Integer, :default => 5 |
#reconnect_retries ⇒ Integer
Returns the reconnect_retries attribute.
35 |
# File 'lib/krakow/producer.rb', line 35 attribute :reconnect_retries, Integer, :default => 10 |
#reconnect_retries? ⇒ TrueClass, FalseClass
Returns truthiness of the reconnect_retries attribute.
35 |
# File 'lib/krakow/producer.rb', line 35 attribute :reconnect_retries, Integer, :default => 10 |
#to_s ⇒ String
Returns stringify object.
78 79 80 |
# File 'lib/krakow/producer.rb', line 78 def to_s "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>" end |
#topic ⇒ String
Returns the topic attribute.
34 |
# File 'lib/krakow/producer.rb', line 34 attribute :topic, String, :required => true |
#topic? ⇒ TrueClass, FalseClass
Returns truthiness of the topic attribute.
34 |
# File 'lib/krakow/producer.rb', line 34 attribute :topic, String, :required => true |
#write(*message) ⇒ Krakow::FrameType, TrueClass
if connection response wait is set to 0, writes will return a ‘true` value on completion
Write message to server
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/krakow/producer.rb', line 132 def write(*) if(.empty?) abort ArgumentError.new 'Expecting one or more messages to send. None provided.' end begin if(.size > 1) debug 'Multiple message publish' connection.transmit( Command::Mpub.new( :topic_name => topic, :messages => ) ) else debug 'Single message publish' connection.transmit( Command::Pub.new( :message => .first, :topic_name => topic ) ) end rescue Celluloid::Task::TerminatedError abort Error::ConnectionUnavailable.new 'Connection is currently unavailable' rescue => e abort e end end |