Class: Nodule::Cassandra
- Defined in:
- lib/nodule/cassandra.rb
Overview
Run temporary instances of Apache Cassandra. Generates random ports for rpc/storage and temporary directories for data, commit logs, etc..
The version of Cassandra is hard-coded to 1.1.0.
Constant Summary collapse
- VERSION =
These two must match. Apache posts the md5’s on the download site.
"1.1.0"
- MD5 =
"8befe18a4abc342d03d1fbaaa0ac836b"
- CASSANDRA =
"apache-cassandra-#{VERSION}"
- TARBALL =
"#{CASSANDRA}-bin.tar.gz"
- TARBALL_URL =
"http://archive.apache.org/dist/cassandra/#{VERSION}/#{TARBALL}"
- CACHEDIRS =
potential locations for caching the cassandra download
[ File.join(ENV['HOME'], 'Downloads'), "/tmp", ]
- CLIENT_CONNECT_OPTIONS =
keep large timeouts, since test systems (e.g. Jenkins workers) are often very slow
{ :timeout => 30, :connect_timeout => 30, :retries => 10, :exception_classes => [], }
Instance Attribute Summary collapse
-
#caches ⇒ Object
readonly
Returns the value of attribute caches.
-
#cassbin ⇒ Object
readonly
Returns the value of attribute cassbin.
-
#commit ⇒ Object
readonly
Returns the value of attribute commit.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#data ⇒ Object
readonly
Returns the value of attribute data.
-
#envfile ⇒ Object
readonly
Returns the value of attribute envfile.
-
#keyspace ⇒ Object
readonly
Returns the value of attribute keyspace.
-
#pidfile ⇒ Object
readonly
Returns the value of attribute pidfile.
-
#rpc_port ⇒ Object
readonly
Returns the value of attribute rpc_port.
-
#tmp ⇒ Object
readonly
Returns the value of attribute tmp.
Attributes inherited from Process
#argv, #ended, #pid, #started, #topology
Attributes inherited from Base
#prefix, #read_count, #readers, #running, #topology
Instance Method Summary collapse
-
#cache_tarball!(tardata) ⇒ String
Write the tarball to a file locally.
-
#cli(*more_args) { ... } ⇒ Object
Run a block with access to cassandra-cli’s stdio.
-
#cli_command(*more_args) ⇒ Array
Returns the fully-quailified cassandra-cli command with host & port set.
-
#client(ks = @keyspace) {|c| ... } ⇒ Cassandra
Setup and return a Cassandra client object.
-
#configure! ⇒ Object
Rewrites portions of the stock Cassandra configuration.
-
#create_keyspace ⇒ Object
Create a keyspace in the newly minted Cassandra instance.
-
#download ⇒ String
Downloads Cassandra tarball to memory from the Apache servers.
-
#initialize(opts = {}) ⇒ Cassandra
constructor
Create a new Nodule::Cassandra instance.
- #nodetool(*more_args) { ... } ⇒ Object
-
#nodetool_command(*more_args) ⇒ Array
Returns the fully-quailified nodetool command with host & JMX port set.
-
#run ⇒ Object
Run the download or untar the cached tarball.
-
#stop ⇒ Object
Stop cassandra with a signal, clean up with recursive delete.
-
#to_s ⇒ String
Stringify this class to the cassandra host/port string, e.g.
-
#untar!(tarball) ⇒ Object
Downloads Cassandra tarball from the Apache servers.
Methods inherited from Process
#_apply_topology, #_kill, #clear_stderr!, #clear_stdout!, #close, #done?, #elapsed, #inspect, #iodone?, #iowait, #print, #puts, #require_stderr_count, #require_stdout_count, #reset, #status, #stderr, #stderr!, #stderr?, #stderr_pipe, #stdin_pipe, #stdout, #stdout!, #stdout?, #stdout_pipe, #stop!, #to_hash, #wait, #waitpid
Methods inherited from Base
#add_reader, #add_readers, #clear!, #done?, #join_topology!, #output, #output!, #output?, #read_until, #require_read_count, #run_readers, #stop!, #verbose, #wait, #wait_with_backoff
Constructor Details
#initialize(opts = {}) ⇒ Cassandra
Create a new Nodule::Cassandra instance. Each instance will be its own single-node Cassandra instance.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/nodule/cassandra.rb', line 49 def initialize(opts={}) @keyspace = opts[:keyspace] || "Nodule" @temp = Nodule::Tempfile.new(:directory => true, :prefix => "nodule-cassandra") @tmp = @temp.file @data = File.join(@tmp, 'data') @caches = File.join(@tmp, 'caches') @commit = File.join(@tmp, 'commitlogs') @host = "127.0.0.1" # will support 127.0.0.2 someday @jmx_port = Nodule::Util.random_tcp_port @rpc_port = Nodule::Util.random_tcp_port @storage_port = Nodule::Util.random_tcp_port @ssl_storage_port = Nodule::Util.random_tcp_port @casshome = "#{@tmp}/#{CASSANDRA}" @pidfile = "#{@casshome}/cassandra.pid" @cassbin = "#{@casshome}/bin" @command = ["#{@cassbin}/cassandra", "-f", "-p", @pidfile] @config = "#{@casshome}/conf/cassandra.yaml" @envfile = "#{@casshome}/conf/cassandra-env.sh" @log4j = "#{@casshome}/conf/log4j-server.properties" @logfile = "#{@tmp}/system.log" # This handler reads STDOUT to determine when Cassandra is ready for client # access. Coerce the stdout option into an array as necessar so options can # still be passed in. if opts[:stdout] unless opts[:stdout].kind_of? Array opts[:stdout] = [ opts.delete(:stdout) ] end else opts[:stdout] = [] end # Watch Cassandra's output to be sure when it's available, obviously, it's a bit fragile # but (IMO) better than sleeping or poking the TCP port. @mutex = Mutex.new @cv = ConditionVariable.new opts[:stdout] << proc do |item| @mutex.synchronize do @cv.signal if item =~ /Listening for thrift clients/ end end super({"CASSANDRA_HOME" => @casshome}, *@command, opts) end |
Instance Attribute Details
#caches ⇒ Object (readonly)
Returns the value of attribute caches.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def caches @caches end |
#cassbin ⇒ Object (readonly)
Returns the value of attribute cassbin.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def cassbin @cassbin end |
#commit ⇒ Object (readonly)
Returns the value of attribute commit.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def commit @commit end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def config @config end |
#data ⇒ Object (readonly)
Returns the value of attribute data.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def data @data end |
#envfile ⇒ Object (readonly)
Returns the value of attribute envfile.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def envfile @envfile end |
#keyspace ⇒ Object (readonly)
Returns the value of attribute keyspace.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def keyspace @keyspace end |
#pidfile ⇒ Object (readonly)
Returns the value of attribute pidfile.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def pidfile @pidfile end |
#rpc_port ⇒ Object (readonly)
Returns the value of attribute rpc_port.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def rpc_port @rpc_port end |
#tmp ⇒ Object (readonly)
Returns the value of attribute tmp.
19 20 21 |
# File 'lib/nodule/cassandra.rb', line 19 def tmp @tmp end |
Instance Method Details
#cache_tarball!(tardata) ⇒ String
Write the tarball to a file locally. Finds a directory in the CACHEDIRS list.
118 119 120 121 122 123 |
# File 'lib/nodule/cassandra.rb', line 118 def cache_tarball!(tardata) cachedir = (CACHEDIRS.select { |path| File.directory?(path) and File.writable?(path) })[0] cachefile = File.join(cachedir, TARBALL) File.open(cachefile, "wb").write(tardata) cachefile end |
#cli(*more_args) { ... } ⇒ Object
Run a block with access to cassandra-cli’s stdio.
247 248 249 250 251 252 253 254 255 |
# File 'lib/nodule/cassandra.rb', line 247 def cli(*more_args) process = Process.new *cli_command(more_args) process.join_topology! @topology process.run yield process, process.stdin_pipe, process.stdout_pipe, process.stderr_pipe process.print "quit;\n" unless process.done? process.wait 3 process.stop end |
#cli_command(*more_args) ⇒ Array
Returns the fully-quailified cassandra-cli command with host & port set. If given a list of arguments, they’re tacked on automatically.
234 235 236 |
# File 'lib/nodule/cassandra.rb', line 234 def cli_command(*more_args) [File.join(@cassbin, 'cassandra-cli'), '-h', @host, '-p', @rpc_port, more_args].flatten end |
#client(ks = @keyspace) {|c| ... } ⇒ Cassandra
Setup and return a Cassandra client object.
219 220 221 222 223 224 225 226 |
# File 'lib/nodule/cassandra.rb', line 219 def client(ks=@keyspace) c = ::Cassandra.new(ks, self.to_s, CLIENT_CONNECT_OPTIONS) c.disable_node_auto_discovery! yield(c) if block_given? c end |
#configure! ⇒ Object
Rewrites portions of the stock Cassandra configuration. This should work fairly well over Cassandra version bumps without editing as long as the Cassandra folks don’t wildly change param names. Modifies conf/cassandra.yaml and conf/cassandra-env.sh.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/nodule/cassandra.rb', line 138 def configure! conf = YAML::load_file(@config) conf.merge!({ "initial_token" => 0, "partitioner" => "org.apache.cassandra.dht.RandomPartitioner", # have to force ascii or YAML will come out as binary "data_file_directories" => [@data.encode("us-ascii")], "commitlog_directory" => @commit.encode("us-ascii"), "saved_caches_directory" => @caches.encode("us-ascii"), "storage_port" => @storage_port.to_i, "ssl_storage_port" => @ssl_storage_port.to_i, "listen_address" => @host.encode("us-ascii"), "rpc_address" => @host.encode("us-ascii"), "rpc_port" => @rpc_port.to_i, # DSE doesn't work OOTB as a single node unless you switch to simplesnitch "endpoint_snitch" => "org.apache.cassandra.locator.SimpleSnitch", }) File.open(@config, "w") { |file| file.puts YAML::dump(conf) } # relocate the JMX port to avoid conflicts with running instances env = File.read(@envfile) env.sub!(/JMX_PORT=['"]?\d+['"]?/, "JMX_PORT=\"#{@jmx_port}\"") File.open(@envfile, "w") { |file| file.puts env } # relocate the system.log log = File.read(@log4j) log.sub!(/log4j.appender.R.File=.*$/, "log4j.appender.R.File=#{@logfile}") File.open(@log4j, "w") do |file| file.puts log end end |
#create_keyspace ⇒ Object
Create a keyspace in the newly minted Cassandra instance.
171 172 173 174 175 176 177 178 179 |
# File 'lib/nodule/cassandra.rb', line 171 def create_keyspace ksdef = CassandraThrift::KsDef.new( :name => @keyspace, :strategy_class => 'org.apache.cassandra.locator.SimpleStrategy', :strategy_options => { "replication_factor" => "1" }, :cf_defs => [] ) client('system').add_keyspace ksdef end |
#download ⇒ String
Downloads Cassandra tarball to memory from the Apache servers.
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/nodule/cassandra.rb', line 102 def download tardata = open(TARBALL_URL).read digest = Digest::MD5.hexdigest(tardata) unless digest == MD5 raise "Expected MD5 #{MD5} but got #{digest}." end tardata end |
#nodetool(*more_args) { ... } ⇒ Object
275 276 277 278 279 280 281 282 |
# File 'lib/nodule/cassandra.rb', line 275 def nodetool(*more_args) process = Process.new *nodetool_command(more_args) process.join_topology! @topology process.run yield process, process.stdin_pipe, process.stdout_pipe, process.stderr_pipe process.wait 3 process.stop end |
#nodetool_command(*more_args) ⇒ Array
Returns the fully-quailified nodetool command with host & JMX port set. If given a list of arguments, they’re tacked on automatically.
263 264 265 |
# File 'lib/nodule/cassandra.rb', line 263 def nodetool_command(*more_args) [File.join(@cassbin, 'nodetool'), '-h', @host, '-p', @jmx_port, more_args].flatten end |
#run ⇒ Object
Run the download or untar the cached tarball. Configure then start Cassandra.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/nodule/cassandra.rb', line 184 def run FileUtils.mkdir_p @data FileUtils.mkdir_p @caches FileUtils.mkdir_p @commit cached = CACHEDIRS.select { |path| File.exists? File.join(path, TARBALL) } if cached.any? untar! File.join(cached.first, TARBALL) else file = cache_tarball! download untar! file end configure! # will start Cassandra process super # wait for Cassandra to say it's ready @mutex.synchronize do @cv.wait @mutex end end |
#stop ⇒ Object
Stop cassandra with a signal, clean up with recursive delete.
209 210 211 212 |
# File 'lib/nodule/cassandra.rb', line 209 def stop super @temp.stop end |
#to_s ⇒ String
Stringify this class to the cassandra host/port string, e.g. “127.0.0.1:12345”
288 289 290 |
# File 'lib/nodule/cassandra.rb', line 288 def to_s [@host, @rpc_port].join(':') end |
#untar!(tarball) ⇒ Object
Downloads Cassandra tarball from the Apache servers.
129 130 131 |
# File 'lib/nodule/cassandra.rb', line 129 def untar!(tarball) system("tar -C #{@tmp} -xzf #{tarball}") end |