Class: Nodule::Cassandra

Inherits:
Process show all
Defined in:
lib/nodule/cassandra.rb

Overview

Run temporary instances of Apache Cassandra. Generates random ports for rpc/storage and temporary directories for data, commit logs, etc..

The version of Cassandra is hard-coded to 1.1.0.

Constant Summary collapse

VERSION =

These two must match. Apache posts the md5’s on the download site.

"1.1.0"
MD5 =
"8befe18a4abc342d03d1fbaaa0ac836b"
CASSANDRA =
"apache-cassandra-#{VERSION}"
TARBALL =
"#{CASSANDRA}-bin.tar.gz"
TARBALL_URL =
"http://archive.apache.org/dist/cassandra/#{VERSION}/#{TARBALL}"
CACHEDIRS =

potential locations for caching the cassandra download

[
  File.join(ENV['HOME'], 'Downloads'),
  "/tmp",
]
CLIENT_CONNECT_OPTIONS =

keep large timeouts, since test systems (e.g. Jenkins workers) are often very slow

{
  :timeout => 30,
  :connect_timeout => 30,
  :retries => 10,
  :exception_classes => [],
}

Instance Attribute Summary collapse

Attributes inherited from Process

#argv, #ended, #pid, #started, #topology

Attributes inherited from Base

#prefix, #read_count, #readers, #running, #topology

Instance Method Summary collapse

Methods inherited from Process

#_apply_topology, #_kill, #clear_stderr!, #clear_stdout!, #close, #done?, #elapsed, #inspect, #iodone?, #iowait, #print, #puts, #require_stderr_count, #require_stdout_count, #reset, #status, #stderr, #stderr!, #stderr?, #stderr_pipe, #stdin_pipe, #stdout, #stdout!, #stdout?, #stdout_pipe, #stop!, #to_hash, #wait, #waitpid

Methods inherited from Base

#add_reader, #add_readers, #clear!, #done?, #join_topology!, #output, #output!, #output?, #read_until, #require_read_count, #run_readers, #stop!, #verbose, #wait, #wait_with_backoff

Constructor Details

#initialize(opts = {}) ⇒ Cassandra

Create a new Nodule::Cassandra instance. Each instance will be its own single-node Cassandra instance.

Parameters:

  • opts (Hash) (defaults to: {})

    the options for setup.

Options Hash (opts):

  • :keyspace (String)

    Keyspace name to use as the default



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/nodule/cassandra.rb', line 49

def initialize(opts={})
  @keyspace   = opts[:keyspace] || "Nodule"

  @temp = Nodule::Tempfile.new(:directory => true, :prefix => "nodule-cassandra")
  @tmp = @temp.file

  @data = File.join(@tmp, 'data')
  @caches = File.join(@tmp, 'caches')
  @commit = File.join(@tmp, 'commitlogs')

  @host = "127.0.0.1" # will support 127.0.0.2 someday
  @jmx_port = Nodule::Util.random_tcp_port
  @rpc_port = Nodule::Util.random_tcp_port
  @storage_port = Nodule::Util.random_tcp_port
  @ssl_storage_port = Nodule::Util.random_tcp_port

  @casshome = "#{@tmp}/#{CASSANDRA}"
  @pidfile = "#{@casshome}/cassandra.pid"
  @cassbin = "#{@casshome}/bin"
  @command = ["#{@cassbin}/cassandra", "-f", "-p", @pidfile]
  @config  = "#{@casshome}/conf/cassandra.yaml"
  @envfile = "#{@casshome}/conf/cassandra-env.sh"
  @log4j   = "#{@casshome}/conf/log4j-server.properties"
  @logfile = "#{@tmp}/system.log"

  # This handler reads STDOUT to determine when Cassandra is ready for client
  # access. Coerce the stdout option into an array as necessar so options can
  # still be passed in.
  if opts[:stdout]
    unless opts[:stdout].kind_of? Array
      opts[:stdout] = [ opts.delete(:stdout) ]
    end
  else
    opts[:stdout] = []
  end

  # Watch Cassandra's output to be sure when it's available, obviously, it's a bit fragile
  # but (IMO) better than sleeping or poking the TCP port.
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  opts[:stdout] << proc do |item|
    @mutex.synchronize do
      @cv.signal if item =~ /Listening for thrift clients/
    end
  end

  super({"CASSANDRA_HOME" => @casshome}, *@command, opts)
end

Instance Attribute Details

#cachesObject (readonly)

Returns the value of attribute caches.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def caches
  @caches
end

#cassbinObject (readonly)

Returns the value of attribute cassbin.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def cassbin
  @cassbin
end

#commitObject (readonly)

Returns the value of attribute commit.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def commit
  @commit
end

#configObject (readonly)

Returns the value of attribute config.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def config
  @config
end

#dataObject (readonly)

Returns the value of attribute data.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def data
  @data
end

#envfileObject (readonly)

Returns the value of attribute envfile.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def envfile
  @envfile
end

#keyspaceObject (readonly)

Returns the value of attribute keyspace.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def keyspace
  @keyspace
end

#pidfileObject (readonly)

Returns the value of attribute pidfile.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def pidfile
  @pidfile
end

#rpc_portObject (readonly)

Returns the value of attribute rpc_port.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def rpc_port
  @rpc_port
end

#tmpObject (readonly)

Returns the value of attribute tmp.



19
20
21
# File 'lib/nodule/cassandra.rb', line 19

def tmp
  @tmp
end

Instance Method Details

#cache_tarball!(tardata) ⇒ String

Write the tarball to a file locally. Finds a directory in the CACHEDIRS list.

Parameters:

  • binary (String)

    string containing tar/gzip data.

Returns:

  • (String)

    full path of the file



118
119
120
121
122
123
# File 'lib/nodule/cassandra.rb', line 118

def cache_tarball!(tardata)
  cachedir = (CACHEDIRS.select { |path| File.directory?(path) and File.writable?(path) })[0]
  cachefile = File.join(cachedir, TARBALL)
  File.open(cachefile, "wb").write(tardata)
  cachefile
end

#cli(*more_args) { ... } ⇒ Object

Run a block with access to cassandra-cli’s stdio.

Parameters:

  • more_args (Array)

    additional command-line arguments

  • block (Hash)

    a customizable set of options

Yields:

  • block with CLI attached



247
248
249
250
251
252
253
254
255
# File 'lib/nodule/cassandra.rb', line 247

def cli(*more_args)
  process = Process.new *cli_command(more_args)
  process.join_topology! @topology
  process.run
  yield process, process.stdin_pipe, process.stdout_pipe, process.stderr_pipe
  process.print "quit;\n" unless process.done?
  process.wait 3
  process.stop
end

#cli_command(*more_args) ⇒ Array

Returns the fully-quailified cassandra-cli command with host & port set. If given a list of arguments, they’re tacked on automatically.

Parameters:

  • more_args (Array)

    additional command-line arguments

Returns:

  • (Array)

    an argv-style array ready to use with Nodule::Process or Kernel.spawn



234
235
236
# File 'lib/nodule/cassandra.rb', line 234

def cli_command(*more_args)
  [File.join(@cassbin, 'cassandra-cli'), '-h', @host, '-p', @rpc_port, more_args].flatten
end

#client(ks = @keyspace) {|c| ... } ⇒ Cassandra

Setup and return a Cassandra client object.

Parameters:

  • keyspace (String)

    optional keyspace argument for the client connection

Yields:

  • (c)

Returns:

  • (Cassandra)

    connection to the temporary Cassandra instance



219
220
221
222
223
224
225
226
# File 'lib/nodule/cassandra.rb', line 219

def client(ks=@keyspace)
  c = ::Cassandra.new(ks, self.to_s, CLIENT_CONNECT_OPTIONS)
  c.disable_node_auto_discovery!

  yield(c) if block_given?

  c
end

#configure!Object

Rewrites portions of the stock Cassandra configuration. This should work fairly well over Cassandra version bumps without editing as long as the Cassandra folks don’t wildly change param names. Modifies conf/cassandra.yaml and conf/cassandra-env.sh.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/nodule/cassandra.rb', line 138

def configure!
  conf = YAML::load_file(@config)
  conf.merge!({
    "initial_token"          => 0,
    "partitioner"            => "org.apache.cassandra.dht.RandomPartitioner",
    # have to force ascii or YAML will come out as binary
    "data_file_directories"  => [@data.encode("us-ascii")],
    "commitlog_directory"    => @commit.encode("us-ascii"),
    "saved_caches_directory" => @caches.encode("us-ascii"),
    "storage_port"           => @storage_port.to_i,
    "ssl_storage_port"       => @ssl_storage_port.to_i,
    "listen_address"         => @host.encode("us-ascii"),
    "rpc_address"            => @host.encode("us-ascii"),
    "rpc_port"               => @rpc_port.to_i,
    # DSE doesn't work OOTB as a single node unless you switch to simplesnitch
    "endpoint_snitch"        => "org.apache.cassandra.locator.SimpleSnitch",
  })
  File.open(@config, "w") { |file| file.puts YAML::dump(conf) }

  # relocate the JMX port to avoid conflicts with running instances
  env = File.read(@envfile)
  env.sub!(/JMX_PORT=['"]?\d+['"]?/, "JMX_PORT=\"#{@jmx_port}\"")
  File.open(@envfile, "w") { |file| file.puts env }

  # relocate the system.log
  log = File.read(@log4j)
  log.sub!(/log4j.appender.R.File=.*$/, "log4j.appender.R.File=#{@logfile}")
  File.open(@log4j, "w") do |file| file.puts log end
end

#create_keyspaceObject

Create a keyspace in the newly minted Cassandra instance.



171
172
173
174
175
176
177
178
179
# File 'lib/nodule/cassandra.rb', line 171

def create_keyspace
  ksdef = CassandraThrift::KsDef.new(
    :name => @keyspace,
    :strategy_class => 'org.apache.cassandra.locator.SimpleStrategy',
    :strategy_options => { "replication_factor" => "1" },
    :cf_defs => []
  )
  client('system').add_keyspace ksdef
end

#downloadString

Downloads Cassandra tarball to memory from the Apache servers.

Returns:

  • (String)

    binary string containing the tar/gzip data.



102
103
104
105
106
107
108
109
110
111
# File 'lib/nodule/cassandra.rb', line 102

def download
  tardata = open(TARBALL_URL).read
  digest = Digest::MD5.hexdigest(tardata)

  unless digest == MD5
    raise "Expected MD5 #{MD5} but got #{digest}."
  end

  tardata
end

#nodetool(*more_args) { ... } ⇒ Object

Parameters:

  • more_args (Array)

    additional command-line arguments

  • block (Hash)

    a customizable set of options

Yields:

  • block with CLI attached



275
276
277
278
279
280
281
282
# File 'lib/nodule/cassandra.rb', line 275

def nodetool(*more_args)
  process = Process.new *nodetool_command(more_args)
  process.join_topology! @topology
  process.run
  yield process, process.stdin_pipe, process.stdout_pipe, process.stderr_pipe
  process.wait 3
  process.stop
end

#nodetool_command(*more_args) ⇒ Array

Returns the fully-quailified nodetool command with host & JMX port set. If given a list of arguments, they’re tacked on automatically.

Parameters:

  • more_args (Array)

    additional command-line arguments

Returns:

  • (Array)

    an argv-style array ready to use with Nodule::Process or Kernel.spawn



263
264
265
# File 'lib/nodule/cassandra.rb', line 263

def nodetool_command(*more_args)
  [File.join(@cassbin, 'nodetool'), '-h', @host, '-p', @jmx_port, more_args].flatten
end

#runObject

Run the download or untar the cached tarball. Configure then start Cassandra.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/nodule/cassandra.rb', line 184

def run
  FileUtils.mkdir_p @data
  FileUtils.mkdir_p @caches
  FileUtils.mkdir_p @commit

  cached = CACHEDIRS.select { |path| File.exists? File.join(path, TARBALL) }
  if cached.any?
    untar! File.join(cached.first, TARBALL)
  else
    file = cache_tarball! download
    untar! file
  end

  configure!

  # will start Cassandra process
  super

  # wait for Cassandra to say it's ready
  @mutex.synchronize do @cv.wait @mutex end
end

#stopObject

Stop cassandra with a signal, clean up with recursive delete.



209
210
211
212
# File 'lib/nodule/cassandra.rb', line 209

def stop
  super
  @temp.stop
end

#to_sString

Stringify this class to the cassandra host/port string, e.g. “127.0.0.1:12345”

Returns:

  • (String)

    Cassandra connection string.



288
289
290
# File 'lib/nodule/cassandra.rb', line 288

def to_s
  [@host, @rpc_port].join(':')
end

#untar!(tarball) ⇒ Object

Downloads Cassandra tarball from the Apache servers.

Parameters:

  • full (String)

    path to the tarball file



129
130
131
# File 'lib/nodule/cassandra.rb', line 129

def untar!(tarball)
  system("tar -C #{@tmp} -xzf #{tarball}")
end