Module: Kafka::CLI
Overview
:nodoc: all
Instance Method Summary collapse
- #config ⇒ Object
- #help ⇒ Object
- #option_parser ⇒ Object
- #parse_args(args = ARGV) ⇒ Object
- #program_name(pn = $0) ⇒ Object
- #publish(options) ⇒ Object
- #publish! ⇒ Object
- #publish? ⇒ Boolean
- #publish_loop(producer) ⇒ Object
- #push(options, message) ⇒ Object
- #read_env(env = ENV) ⇒ Object
- #read_input ⇒ Object
- #string_to_compression(meth) ⇒ Object
- #subscribe(options) ⇒ Object
- #subscribe! ⇒ Object
- #subscribe? ⇒ Boolean
- #validate_config ⇒ Object
Instance Method Details
#config ⇒ Object
67 68 69 |
# File 'lib/kafka/cli.rb', line 67 def config @config ||= {:compression => string_to_compression("no")} end |
#help ⇒ Object
71 72 73 |
# File 'lib/kafka/cli.rb', line 71 def help option_parser.to_s end |
#option_parser ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/kafka/cli.rb', line 75 def option_parser OptionParser.new do |opts| opts. = "Usage: #{program_name} [options]" opts.separator "" opts.on("-h","--host HOST", "Set the kafka hostname") do |h| config[:host] = h end opts.on("-p", "--port PORT", "Set the kafka port") do |port| config[:port] = port.to_i end opts.on("-t", "--topic TOPIC", "Set the kafka topic") do |topic| config[:topic] = topic end opts.on("-c", "--compression no|gzip|snappy", "Set the compression method") do |meth| config[:compression] = string_to_compression(meth) end if publish? opts.on("-m","--message MESSAGE", "Message to send") do |msg| config[:message] = msg end if publish? opts.separator "" opts.on("--help", "show the help") do config[:help] = true end opts.separator "" opts.separator "You can set the host, port, topic and compression from the environment variables: KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC AND KAFKA_COMPRESSION" end end |
#parse_args(args = ARGV) ⇒ Object
56 57 58 |
# File 'lib/kafka/cli.rb', line 56 def parse_args(args = ARGV) option_parser.parse(args) end |
#program_name(pn = $0) ⇒ Object
119 120 121 |
# File 'lib/kafka/cli.rb', line 119 def program_name(pn = $0) File.basename(pn) end |
#publish(options) ⇒ Object
136 137 138 139 140 141 142 |
# File 'lib/kafka/cli.rb', line 136 def publish() trap(:INT){ exit } producer = Producer.new() loop do publish_loop(producer) end end |
#publish! ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/kafka/cli.rb', line 22 def publish! read_env parse_args validate_config if config[:message] push(config, config.delete(:message)) else publish(config) end end |
#publish? ⇒ Boolean
111 112 113 |
# File 'lib/kafka/cli.rb', line 111 def publish? program_name == "kafka-publish" end |
#publish_loop(producer) ⇒ Object
144 145 146 147 |
# File 'lib/kafka/cli.rb', line 144 def publish_loop(producer) = read_input producer.push(Message.new()) end |
#push(options, message) ⇒ Object
132 133 134 |
# File 'lib/kafka/cli.rb', line 132 def push(, ) Producer.new().push(Message.new()) end |
#read_env(env = ENV) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/kafka/cli.rb', line 60 def read_env(env = ENV) config[:host] = env["KAFKA_HOST"] if env["KAFKA_HOST"] config[:port] = env["KAFKA_PORT"].to_i if env["KAFKA_PORT"] config[:topic] = env["KAFKA_TOPIC"] if env["KAFKA_TOPIC"] config[:compression] = string_to_compression(env["KAFKA_COMPRESSION"]) if env["KAFKA_COMPRESSION"] end |
#read_input ⇒ Object
149 150 151 152 153 154 155 156 157 |
# File 'lib/kafka/cli.rb', line 149 def read_input input = $stdin.gets if input input.strip else exit # gets return nil when eof end end |
#string_to_compression(meth) ⇒ Object
123 124 125 126 127 128 129 130 |
# File 'lib/kafka/cli.rb', line 123 def string_to_compression(meth) case meth when "no" then Message::NO_COMPRESSION when "gzip" then Message::GZIP_COMPRESSION when "snappy" then Message::SNAPPY_COMPRESSION else raise "No supported compression" end end |
#subscribe(options) ⇒ Object
159 160 161 162 163 164 165 166 167 |
# File 'lib/kafka/cli.rb', line 159 def subscribe() trap(:INT){ exit } consumer = Consumer.new() consumer.loop do || .each do || puts .payload end end end |
#subscribe! ⇒ Object
34 35 36 37 38 39 |
# File 'lib/kafka/cli.rb', line 34 def subscribe! read_env parse_args validate_config subscribe(config) end |
#subscribe? ⇒ Boolean
115 116 117 |
# File 'lib/kafka/cli.rb', line 115 def subscribe? program_name == "kafka-subscribe" end |
#validate_config ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/kafka/cli.rb', line 41 def validate_config if config[:help] puts help exit end config[:host] ||= IO::HOST config[:port] ||= IO::PORT config[:topic].is_a?(String) or raise "Missing topic" rescue RuntimeError => e puts e. puts help exit end |