Class: HBase

Inherits:
Object
  • Object
show all
Includes:
Admin
Defined in:
lib/hbase-jruby/hbase.rb,
lib/hbase-jruby.rb,
lib/hbase-jruby/row.rb,
lib/hbase-jruby/cell.rb,
lib/hbase-jruby/util.rb,
lib/hbase-jruby/admin.rb,
lib/hbase-jruby/table.rb,
lib/hbase-jruby/schema.rb,
lib/hbase-jruby/scoped.rb,
lib/hbase-jruby/version.rb,
lib/hbase-jruby/byte_array.rb,
lib/hbase-jruby/dependency.rb,
lib/hbase-jruby/table/admin.rb,
lib/hbase-jruby/table/mutation.rb,
lib/hbase-jruby/batch_exception.rb,
lib/hbase-jruby/table/inspection.rb,
lib/hbase-jruby/scoped/aggregation.rb,
lib/hbase-jruby/table/batch_action.rb,
lib/hbase-jruby/table/checked_operation.rb

Overview

HBase connection

Defined Under Namespace

Modules: Admin, JRuby, Util Classes: BatchException, ByteArray, Cell, Row, Schema, Scoped, Table

Constant Summary collapse

Result =

For backward compatibility

HBase::Row
SUPPORTED_PROFILES =
{
  # Prefix => Latest known version
  'cdh5.1' => 'cdh5.1.0',
  'cdh5.0' => 'cdh5.0.0',
  'cdh4.6' => 'cdh4.6.0',
  'cdh4.5' => 'cdh4.5.0',
  'cdh4.4' => 'cdh4.4.0',
  'cdh4.3' => 'cdh4.3.2',
  'cdh4.2' => 'cdh4.2.2',
  'cdh4.1' => 'cdh4.1.5',
  'cdh3'   => 'cdh3u6',
  '0.98'   => '0.98.0-hadoop2',
  '0.96'   => '0.96.2-hadoop2',
  '0.95'   => '0.95.2-hadoop2',
  '0.94'   => '0.94.18',
  '0.92'   => '0.92.2',
}
@@log4j =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ HBase

Connects to HBase

Parameters:

  • config (Hash) (defaults to: {})

    A key-value pairs to build HBaseConfiguration from



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/hbase-jruby/hbase.rb', line 49

def initialize config = {}
  begin
    org.apache.hadoop.conf.Configuration
  rescue NameError
    raise NameError.new(
      "Required Java classes not loaded. Set up CLASSPATH or try `HBase.resolve_dependency!`")
  end

  HBase.import_java_classes!

  @config =
    case config
    when org.apache.hadoop.conf.Configuration
      config
    else
      HBaseConfiguration.create.tap do |hbcfg|
        config.each do |k, v|
          hbcfg.set k.to_s, v.to_s
        end
      end
    end
  @connection = HConnectionManager.createConnection @config
  @htable_pool =
    if @connection.respond_to?(:getTable)
      nil
    else
      HTablePool.new @config, java.lang.Integer::MAX_VALUE
    end
  @threads = Set.new
  @mutex   = Mutex.new
  @schema  = Schema.new
  @closed  = false
end

Instance Attribute Details

#configObject (readonly)



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/hbase-jruby/hbase.rb', line 8

class HBase
  attr_reader :config, :schema

  include Admin

  # @overload HBase.log4j=(filename)
  #   Configure Log4j logging with the given file
  #   @param [String] filename Path to log4j.properties or log4j.xml file
  #   @return [String]
  # @overload HBase.log4j=(hash)
  #   Configure Log4j logging with the given Hash
  #   @param [Hash] hash Log4j properties in Ruby Hash
  #   @return [Hash]
  # @overload HBase.log4j=(props)
  #   Configure Log4j logging with the given Properties
  #   @param [java.util.Properties] props Properties object
  #   @return [java.util.Properties]
  def self.log4j= arg
    org.apache.log4j.PropertyConfigurator rescue nil
    if defined?(org.apache.log4j.PropertyConfigurator)
      if arg.is_a?(Hash)
        props = java.util.Properties.new
        arg.each do |k, v|
          props.setProperty k.to_s, v.to_s
        end
        org.apache.log4j.PropertyConfigurator.configure props
      else
        case File.extname(arg).downcase
        when '.xml'
          org.apache.log4j.xml.DOMConfigurator.configure arg
        else
          org.apache.log4j.PropertyConfigurator.configure arg
        end
      end
    else
      @@log4j = arg
    end
  end

  # Connects to HBase
  # @param [Hash] config A key-value pairs to build HBaseConfiguration from
  def initialize config = {}
    begin
      org.apache.hadoop.conf.Configuration
    rescue NameError
      raise NameError.new(
        "Required Java classes not loaded. Set up CLASSPATH or try `HBase.resolve_dependency!`")
    end

    HBase.import_java_classes!

    @config =
      case config
      when org.apache.hadoop.conf.Configuration
        config
      else
        HBaseConfiguration.create.tap do |hbcfg|
          config.each do |k, v|
            hbcfg.set k.to_s, v.to_s
          end
        end
      end
    @connection = HConnectionManager.createConnection @config
    @htable_pool =
      if @connection.respond_to?(:getTable)
        nil
      else
        HTablePool.new @config, java.lang.Integer::MAX_VALUE
      end
    @threads = Set.new
    @mutex   = Mutex.new
    @schema  = Schema.new
    @closed  = false
  end

  # Returns if this instance is backed by an HTablePool which is deprecated
  # in the recent versions of HBase
  # @return [Boolean]
  def use_table_pool?
    !@htable_pool.nil?
  end

  # Returns an HBaseAdmin object for administration
  # @yield [admin] An HBaseAdmin object
  # @yieldparam [org.apache.hadoop.hbase.client.HBaseAdmin] admin
  # @return [org.apache.hadoop.hbase.client.HBaseAdmin]
  def admin
    if block_given?
      with_admin { |admin| yield admin }
    else
      check_closed
      HBaseAdmin.new @config
    end
  end

  # Closes HTablePool and connection
  # @return [nil]
  def close
    @mutex.synchronize do
      unless @closed
        @closed = true
        @htable_pool.close if use_table_pool?
        clear_thread_locals
        @connection.close

        # To be deprecated
        begin
          HConnectionManager.deleteConnection(@config)
        rescue ArgumentError
          # HBase 0.92 or below
          HConnectionManager.deleteConnection(@config, true)
        end if use_table_pool?
      end
    end
  end

  # Returns whether if the connection is closed
  # @return [Boolean]
  def closed?
    @closed
  end

  # Returns the list of HBase::Table instances
  # @return [Array<HBase::Table>]
  def tables
    check_closed
    table_names.map { |tn| table(tn) }
  end

  # Returns the list of table names
  # @return [Array<String>]
  def table_names
    check_closed
    with_admin { |admin| admin.list_tables.map(&:name_as_string) }
  end
  alias list table_names

  # Creates an HBase::Table instance for the specified name
  # @param [#to_s] table_name The name of the table
  # @return [HBase::Table]
  def table table_name
    check_closed

    ht = HBase::Table.send :new, self, @config, table_name

    if block_given?
      yield ht
    else
      ht
    end
  end
  alias [] table

  # Returns an Array of snapshot information
  # @return [Array<Hash>]
  def snapshots
    with_admin { |admin| admin.listSnapshots }.map { |sd|
      props = sd.getAllFields.map { |k, v|
        [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
      }
      Hash[props]
    }
  end

  # @param [Hash] hash
  # @return [HBase::Schema]
  def schema= hash
    unless hash.is_a?(Hash)
      raise ArgumentError, "invalid schema: Hash required"
    end

    schema = Schema.new
    hash.each do |table, definition|
      schema[table] = definition
    end
    @schema = schema
 end

  # Reset underlying HTablePool
  # @deprecated
  # @return [nil]
  def reset_table_pool
    raise RuntimeError, 'Not using table pool' unless use_table_pool?

    @mutex.synchronize do
      clear_thread_locals
      @htable_pool.close
      @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
    end
    nil
  end

private
  def register_thread t
    # (NOTE) The cleanup routine can be inefficient when the number of
    # concurrent threads becomes large. However, since it is not likely that
    # there will be more than a few hundred threads in a typical JRuby process
    # and the code is executed only once per thread, let's simply assume that
    # it's okay.
    @mutex.synchronize do
      check_closed
      @threads << t
      alives, deads = @threads.partition { |e| e.alive? }
      @threads = Set.new(alives)
      deads.each do |dead|
        (dead[:hbase_jruby].delete(self) || {}).each do |_, htable|
          htable.close rescue nil
        end
      end
    end
  end

  def clear_thread_locals
    # Cleanup thread-local references
    @threads.each do |thr|
      thr[:hbase_jruby].delete self
    end
  end

  def get_htable name
    (@htable_pool || @connection).get_table name
  end

  def check_closed
    raise RuntimeError, "Connection already closed" if closed?
  end
end

#javaObject (readonly)



# File 'lib/hbase-jruby/cell.rb', line 3

#nameObject (readonly)



# File 'lib/hbase-jruby/table.rb', line 5

#schemaObject

Returns the value of attribute schema.



9
10
11
# File 'lib/hbase-jruby/hbase.rb', line 9

def schema
  @schema
end

#tableObject (readonly) Also known as: []



148
149
150
151
152
153
154
155
156
157
158
# File 'lib/hbase-jruby/hbase.rb', line 148

def table table_name
  check_closed

  ht = HBase::Table.send :new, self, @config, table_name

  if block_given?
    yield ht
  else
    ht
  end
end

Class Method Details

.ByteArray(*values) ⇒ HBase::ByteArray

Shortcut method to HBase::ByteArray.new

Parameters:

  • values (*Object)

Returns:



6
7
8
# File 'lib/hbase-jruby/byte_array.rb', line 6

def ByteArray *values
  ByteArray.new(*values)
end

.import_java_classes!Array<String>

Import Java classes (Prerequisite for classes in hbase-jruby)

Returns:

  • (Array<String>)

    List of Java classes NOT found



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/hbase-jruby/dependency.rb', line 136

def import_java_classes!
  imp = lambda { |hash|
    hash.map { |base, classes|
      base.class_eval do
        classes.map { |klass|
          begin
            java_import klass
            nil
          rescue NameError => e
            klass
          end
        }.compact
      end
    }.flatten
  }

  imp.call(
    HBase => %w[
      org.apache.hadoop.hbase.HBaseConfiguration
      org.apache.hadoop.hbase.client.HBaseAdmin
      org.apache.hadoop.hbase.client.HConnectionManager
      org.apache.hadoop.hbase.client.HTablePool
    ],
    HBase::Util => %w[
      java.nio.ByteBuffer
      org.apache.hadoop.hbase.KeyValue
      org.apache.hadoop.hbase.util.Bytes
    ],
    HBase::ByteArray => %w[
      java.util.Arrays
      org.apache.hadoop.hbase.util.Bytes
    ],
    HBase::Cell => %w[
      org.apache.hadoop.hbase.KeyValue
    ],
    HBase::Result => %w[
      org.apache.hadoop.hbase.util.Bytes
    ],
    HBase::Table => %w[
      org.apache.hadoop.hbase.HColumnDescriptor
      org.apache.hadoop.hbase.HTableDescriptor
      org.apache.hadoop.hbase.client.Append
      org.apache.hadoop.hbase.client.Delete
      org.apache.hadoop.hbase.client.Increment
      org.apache.hadoop.hbase.client.Put
      org.apache.hadoop.hbase.client.RowMutations
      org.apache.hadoop.hbase.io.hfile.Compression
      org.apache.hadoop.hbase.io.compress.Compression
    ], # hfile.Compression <= 0.94
    HBase::Scoped => %w[
      org.apache.hadoop.hbase.client.Get
      org.apache.hadoop.hbase.client.Scan
      org.apache.hadoop.hbase.filter.BinaryComparator
      org.apache.hadoop.hbase.filter.ColumnPaginationFilter
      org.apache.hadoop.hbase.filter.ColumnRangeFilter
      org.apache.hadoop.hbase.filter.CompareFilter
      org.apache.hadoop.hbase.filter.FilterBase
      org.apache.hadoop.hbase.filter.FilterList
      org.apache.hadoop.hbase.filter.KeyOnlyFilter
      org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter
      org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter
      org.apache.hadoop.hbase.filter.PrefixFilter
      org.apache.hadoop.hbase.filter.RegexStringComparator
      org.apache.hadoop.hbase.filter.RowFilter
      org.apache.hadoop.hbase.filter.SingleColumnValueFilter
      org.apache.hadoop.hbase.filter.WhileMatchFilter
      org.apache.hadoop.hbase.client.coprocessor.AggregationClient
      org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter
    ]).tap { |not_found|

    if not_found.empty?
      self.instance_eval do
        def import_java_classes!
          []
        end
      end
    end
  }
end

.HBase.log4j=(filename) ⇒ String .HBase.log4j=(hash) ⇒ Hash .HBase.log4j=(props) ⇒ java.util.Properties

Overloads:

  • .HBase.log4j=(filename) ⇒ String

    Configure Log4j logging with the given file

    Parameters:

    • filename (String)

      Path to log4j.properties or log4j.xml file

    Returns:

    • (String)
  • .HBase.log4j=(hash) ⇒ Hash

    Configure Log4j logging with the given Hash

    Parameters:

    • hash (Hash)

      Log4j properties in Ruby Hash

    Returns:

    • (Hash)
  • .HBase.log4j=(props) ⇒ java.util.Properties

    Configure Log4j logging with the given Properties

    Parameters:

    • props (java.util.Properties)

      Properties object

    Returns:

    • (java.util.Properties)


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/hbase-jruby/hbase.rb', line 25

def self.log4j= arg
  org.apache.log4j.PropertyConfigurator rescue nil
  if defined?(org.apache.log4j.PropertyConfigurator)
    if arg.is_a?(Hash)
      props = java.util.Properties.new
      arg.each do |k, v|
        props.setProperty k.to_s, v.to_s
      end
      org.apache.log4j.PropertyConfigurator.configure props
    else
      case File.extname(arg).downcase
      when '.xml'
        org.apache.log4j.xml.DOMConfigurator.configure arg
      else
        org.apache.log4j.PropertyConfigurator.configure arg
      end
    end
  else
    @@log4j = arg
  end
end

.resolve_dependency!(dist, options) ⇒ Array<String> .resolve_dependency!(pom_path, options) ⇒ Array<String>

Overloads:

  • .resolve_dependency!(dist, options) ⇒ Array<String>

    Resolve Hadoop and HBase dependency with a predefined Maven profile

    Parameters:

    • dist (String)

      HBase distribution: cdh4., cdh3, 0.94, 0.92, local

    • options (Hash)

      Options

    Options Hash (options):

    • :verbose (Boolean)

      Enable verbose output

    Returns:

    • (Array<String>)

      Loaded JAR files

  • .resolve_dependency!(pom_path, options) ⇒ Array<String>

    Resolve Hadoop and HBase dependency with the given Maven POM file

    Parameters:

    • pom_path (String)

      Path to POM file

    • options (Hash)

      Options

    Options Hash (options):

    • :verbose (Boolean)

      Enable verbose output

    • :profile (String)

      Maven profile

    Returns:

    • (Array<String>)

      Loaded JAR files



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/hbase-jruby/dependency.rb', line 45

def resolve_dependency! dist, options = {}
  # Backward-compatibility
  options = { :verbose => options } if [true, false].include?(options)
  options = { :verbose => false }.merge(options)

  dist    = dist.to_s
  verbose = options[:verbose]

  silencer = verbose ? '' : '> /dev/null'
  tempfiles = []

  jars =
    if %w[hbase local].include?(dist)
      # Check for hbase executable
      hbase = `which hbase`
      raise RuntimeError, "Cannot find `hbase` executable" if hbase.empty?
      `hbase classpath`.strip.split(':').map { |e| Dir[e] }.flatten
    else
      # Check for Maven executable
      mvn = `which mvn`
      raise RuntimeError, "Cannot find `mvn` executable" if mvn.empty?

      # POM file path given (with optional profile)
      if File.exists?(dist)
        path = dist
        profile = options[:profile] && "-P #{options[:profile]}"
      # Predefined dependencies
      else
        matched_profiles = SUPPORTED_PROFILES.keys.select { |pf| dist.start_with? pf }
        if matched_profiles.length != 1
          raise ArgumentError, "Unknown profile: #{dist}"
        end
        matched_profile = matched_profiles.first
        profiles = SUPPORTED_PROFILES.dup
        profiles[matched_profile] = dist if dist != matched_profile
        tempfiles << tf = Tempfile.new('hbase-jruby-pom')
        erb = ERB.new(File.read File.expand_path("../pom/pom.xml.erb", __FILE__))
        tf << erb.result(binding)
        tf.close(false)
        path = tf.path
        profile = "-P #{matched_profile}"
      end

      # Download dependent JAR files and build classpath string
      tempfiles << tf = Tempfile.new('hbase-jruby-classpath')
      tf.close(false)
      system("mvn org.apache.maven.plugins:maven-dependency-plugin:2.5.1:resolve " <<
             "org.apache.maven.plugins:maven-dependency-plugin:2.5.1:build-classpath " <<
             "-Dsilent=true -Dmdep.outputFile=#{tf.path} #{profile} -f #{path} #{silencer}")

      unless $?.exitstatus == 0
        message = "Error occurred."
        message << " Set verbose option to see the log." unless verbose
        raise RuntimeError.new(message)
      end

      if File.read(tf.path).empty?
        desc =
          if options[:profile]
            "#{dist} (#{options[:profile]})"
          else
            dist
          end
        raise ArgumentError.new("Invalid profile: #{desc}")
      end
      File.read(tf.path).split(':')
    end

  # Load jars
  jars_loaded = jars.select { |jar|
    File.file?(jar) &&
    File.extname(jar).downcase == '.jar' &&
    require(jar)
  }

  # Apply pending log4j configuration
  HBase.log4j = @@log4j if @@log4j

  # Try importing Java classes again
  not_found = HBase.import_java_classes!
  if verbose && !not_found.empty?
    warn "Java classes not found: #{not_found.join(', ')}"
  end

  return jars_loaded
ensure
  tempfiles.each { |tempfile| tempfile.unlink rescue nil }
end

Instance Method Details

#admin {|admin| ... } ⇒ org.apache.hadoop.hbase.client.HBaseAdmin

Returns an HBaseAdmin object for administration

Yields:

  • (admin)

    An HBaseAdmin object

Yield Parameters:

  • admin (org.apache.hadoop.hbase.client.HBaseAdmin)

Returns:

  • (org.apache.hadoop.hbase.client.HBaseAdmin)


94
95
96
97
98
99
100
101
# File 'lib/hbase-jruby/hbase.rb', line 94

def admin
  if block_given?
    with_admin { |admin| yield admin }
  else
    check_closed
    HBaseAdmin.new @config
  end
end

#closenil

Closes HTablePool and connection

Returns:

  • (nil)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/hbase-jruby/hbase.rb', line 105

def close
  @mutex.synchronize do
    unless @closed
      @closed = true
      @htable_pool.close if use_table_pool?
      clear_thread_locals
      @connection.close

      # To be deprecated
      begin
        HConnectionManager.deleteConnection(@config)
      rescue ArgumentError
        # HBase 0.92 or below
        HConnectionManager.deleteConnection(@config, true)
      end if use_table_pool?
    end
  end
end

#closed?Boolean

Returns whether if the connection is closed

Returns:

  • (Boolean)


126
127
128
# File 'lib/hbase-jruby/hbase.rb', line 126

def closed?
  @closed
end

#reset_table_poolnil

Deprecated.

Reset underlying HTablePool

Returns:

  • (nil)

Raises:

  • (RuntimeError)


189
190
191
192
193
194
195
196
197
198
# File 'lib/hbase-jruby/hbase.rb', line 189

def reset_table_pool
  raise RuntimeError, 'Not using table pool' unless use_table_pool?

  @mutex.synchronize do
    clear_thread_locals
    @htable_pool.close
    @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
  end
  nil
end

#snapshotsArray<Hash>

Returns an Array of snapshot information

Returns:

  • (Array<Hash>)


163
164
165
166
167
168
169
170
# File 'lib/hbase-jruby/hbase.rb', line 163

def snapshots
  with_admin { |admin| admin.listSnapshots }.map { |sd|
    props = sd.getAllFields.map { |k, v|
      [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
    }
    Hash[props]
  }
end

#table_namesArray<String> Also known as: list

Returns the list of table names

Returns:

  • (Array<String>)


139
140
141
142
# File 'lib/hbase-jruby/hbase.rb', line 139

def table_names
  check_closed
  with_admin { |admin| admin.list_tables.map(&:name_as_string) }
end

#tablesArray<HBase::Table>

Returns the list of HBase::Table instances

Returns:



132
133
134
135
# File 'lib/hbase-jruby/hbase.rb', line 132

def tables
  check_closed
  table_names.map { |tn| table(tn) }
end

#use_table_pool?Boolean

Returns if this instance is backed by an HTablePool which is deprecated in the recent versions of HBase

Returns:

  • (Boolean)


86
87
88
# File 'lib/hbase-jruby/hbase.rb', line 86

def use_table_pool?
  !@htable_pool.nil?
end