Class: Krakow::Producer

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/producer.rb,
lib/krakow/producer/http.rb

Overview

TCP based producer

Defined Under Namespace

Classes: Http

Instance Attribute Summary collapse

Attributes collapse

Instance Method Summary collapse

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(args = {}) ⇒ Producer

Returns a new instance of Producer.



40
41
42
43
44
45
46
# File 'lib/krakow/producer.rb', line 40

def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}, :options => {}}.merge(
    arguments.fetch(:connection_options, {})
  )
  connect
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



22
23
24
# File 'lib/krakow/producer.rb', line 22

def connection
  @connection
end

Instance Method Details

#connectObject

Establish connection to configured ‘host` and `port`

Returns:

  • nil



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/krakow/producer.rb', line 51

def connect
  info "Establishing connection to: #{host}:#{port}"
  begin
    con_args = connection_options[:options].dup.tap do |args|
      args[:host] = host
      args[:port] = port
      if(connection_options[:features])
        args[:features] = connection_options[:features]
      end
      if(connection_options[:config])
        args[:features_args] = connection_options[:config]
      end
    end
    @connection = Connection.new(con_args)
    connection.init!
    self.link connection
    info "Connection established: #{connection}"
    nil
  rescue => e
    abort e
  end
end

#connected?TrueClass, FalseClass

Returns currently connected to server.

Returns:

  • (TrueClass, FalseClass)

    currently connected to server



80
81
82
# File 'lib/krakow/producer.rb', line 80

def connected?
  !!(connection && connection.alive? && connection.connected?)
end

#connection_failure(*args) ⇒ TrueClass

Process connection failure and attempt reconnection

Returns:

  • (TrueClass)


87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/krakow/producer.rb', line 87

def connection_failure(*args)
  @connection = nil
  begin
    warn "Connection failure detected for #{host}:#{port}"
    connect
  rescue => e
    warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry"
    sleep reconnect_interval
    connect
  end
  true
end

#connection_optionsHash

Returns the connection_options attribute.

Returns:

  • (Hash)

    the connection_options attribute



36
# File 'lib/krakow/producer.rb', line 36

attribute :connection_options, Hash, :default => ->{ Hash.new }

#connection_options?TrueClass, FalseClass

Returns truthiness of the connection_options attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the connection_options attribute



36
# File 'lib/krakow/producer.rb', line 36

attribute :connection_options, Hash, :default => ->{ Hash.new }

#goodbye_my_love!Object

Instance destructor

Returns:

  • nil



102
103
104
105
106
107
108
109
110
# File 'lib/krakow/producer.rb', line 102

def goodbye_my_love!
  debug 'Tearing down producer'
  if(connection && connection.alive?)
    connection.terminate
  end
  @connection = nil
  info 'Producer torn down'
  nil
end

#hostString

Returns the host attribute.

Returns:

  • (String)

    the host attribute



31
# File 'lib/krakow/producer.rb', line 31

attribute :host, String, :required => true

#host?TrueClass, FalseClass

Returns truthiness of the host attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



31
# File 'lib/krakow/producer.rb', line 31

attribute :host, String, :required => true

#port[String, Integer]

Returns the port attribute.

Returns:

  • ([String, Integer])

    the port attribute



32
# File 'lib/krakow/producer.rb', line 32

attribute :port, [String, Integer], :required => true

#port?TrueClass, FalseClass

Returns truthiness of the port attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



32
# File 'lib/krakow/producer.rb', line 32

attribute :port, [String, Integer], :required => true

#reconnect_intervalInteger

Returns the reconnect_interval attribute.

Returns:

  • (Integer)

    the reconnect_interval attribute



35
# File 'lib/krakow/producer.rb', line 35

attribute :reconnect_interval, Integer, :default => 5

#reconnect_interval?TrueClass, FalseClass

Returns truthiness of the reconnect_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the reconnect_interval attribute



35
# File 'lib/krakow/producer.rb', line 35

attribute :reconnect_interval, Integer, :default => 5

#reconnect_retriesInteger

Returns the reconnect_retries attribute.

Returns:

  • (Integer)

    the reconnect_retries attribute



34
# File 'lib/krakow/producer.rb', line 34

attribute :reconnect_retries, Integer, :default => 10

#reconnect_retries?TrueClass, FalseClass

Returns truthiness of the reconnect_retries attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the reconnect_retries attribute



34
# File 'lib/krakow/producer.rb', line 34

attribute :reconnect_retries, Integer, :default => 10

#to_sString

Returns stringify object.

Returns:

  • (String)

    stringify object



75
76
77
# File 'lib/krakow/producer.rb', line 75

def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>"
end

#topicString

Returns the topic attribute.

Returns:

  • (String)

    the topic attribute



33
# File 'lib/krakow/producer.rb', line 33

attribute :topic, String, :required => true

#topic?TrueClass, FalseClass

Returns truthiness of the topic attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



33
# File 'lib/krakow/producer.rb', line 33

attribute :topic, String, :required => true

#write(*message) ⇒ Krakow::FrameType::Error?

Write message to server

Parameters:

  • message (String)

    message to write

Returns:

Raises:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/krakow/producer.rb', line 117

def write(*message)
  if(message.empty?)
    abort ArgumentError.new 'Expecting one or more messages to send. None provided.'
  end
  if(connection && connection.alive?)
    if(message.size > 1)
      debug 'Multiple message publish'
      connection.transmit(
        Command::Mpub.new(
          :topic_name => topic,
          :messages => message
        )
      )
    else
      debug 'Single message publish'
      connection.transmit(
        Command::Pub.new(
          :message => message.first,
          :topic_name => topic
        )
      )
    end
  else
    abort Error::ConnectionUnavailable.new 'Remote connection is unavailable!'
  end
end