Class: RBHive::Connection
- Inherits:
-
Object
- Object
- RBHive::Connection
show all
- Defined in:
- lib/rbhive/connection.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(server, port = 10_000, logger = StdOutLogger.new) ⇒ Connection
Returns a new instance of Connection.
34
35
36
37
38
39
40
41
42
|
# File 'lib/rbhive/connection.rb', line 34
def initialize(server, port=10_000, logger=StdOutLogger.new)
@socket = Thrift::Socket.new(server, port)
@transport = Thrift::BufferedTransport.new(@socket)
@protocol = Thrift::BinaryProtocol.new(@transport)
@client = Hive::Thrift::ThriftHive::Client.new(@protocol)
@logger = logger
@logger.info("Connecting to #{server} on port #{port}")
@mutex = Mutex.new
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(meth, *args) ⇒ Object
129
130
131
|
# File 'lib/rbhive/connection.rb', line 129
def method_missing(meth, *args)
client.send(meth, *args)
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
32
33
34
|
# File 'lib/rbhive/connection.rb', line 32
def client
@client
end
|
Instance Method Details
#add_columns(schema) ⇒ Object
125
126
127
|
# File 'lib/rbhive/connection.rb', line 125
def add_columns(schema)
execute(schema.add_columns_statement)
end
|
#close ⇒ Object
48
49
50
|
# File 'lib/rbhive/connection.rb', line 48
def close
@transport.close
end
|
#create_table(schema) ⇒ Object
112
113
114
|
# File 'lib/rbhive/connection.rb', line 112
def create_table(schema)
execute(schema.create_table_statement)
end
|
#drop_table(name) ⇒ Object
116
117
118
119
|
# File 'lib/rbhive/connection.rb', line 116
def drop_table(name)
name = name.name if name.is_a?(TableSchema)
execute("DROP TABLE `#{name}`")
end
|
#execute(query) ⇒ Object
56
57
58
|
# File 'lib/rbhive/connection.rb', line 56
def execute(query)
execute_safe(query)
end
|
#explain(query) ⇒ Object
60
61
62
63
64
65
|
# File 'lib/rbhive/connection.rb', line 60
def explain(query)
safe do
execute_unsafe("EXPLAIN "+ query)
ExplainResult.new(client.fetchAll)
end
end
|
#fetch(query) ⇒ Object
80
81
82
83
84
85
86
87
|
# File 'lib/rbhive/connection.rb', line 80
def fetch(query)
safe do
execute_unsafe(query)
rows = client.fetchAll
the_schema = SchemaDefinition.new(client.getSchema, rows.first)
ResultSet.new(rows, the_schema)
end
end
|
#fetch_in_batch(query, batch_size = 1_000) ⇒ Object
89
90
91
92
93
94
95
96
97
|
# File 'lib/rbhive/connection.rb', line 89
def fetch_in_batch(query, batch_size=1_000)
safe do
execute_unsafe(query)
until (next_batch = client.fetchN(batch_size)).empty?
the_schema ||= SchemaDefinition.new(client.getSchema, next_batch.first)
yield ResultSet.new(next_batch, the_schema)
end
end
end
|
#first(query) ⇒ Object
99
100
101
102
103
104
105
106
|
# File 'lib/rbhive/connection.rb', line 99
def first(query)
safe do
execute_unsafe(query)
row = client.fetchOne
the_schema = SchemaDefinition.new(client.getSchema, row)
ResultSet.new([row], the_schema).first
end
end
|
#open ⇒ Object
44
45
46
|
# File 'lib/rbhive/connection.rb', line 44
def open
@transport.open
end
|
#priority=(priority) ⇒ Object
67
68
69
|
# File 'lib/rbhive/connection.rb', line 67
def priority=(priority)
set("mapred.job.priority", priority)
end
|
#queue=(queue) ⇒ Object
71
72
73
|
# File 'lib/rbhive/connection.rb', line 71
def queue=(queue)
set("mapred.job.queue.name", queue)
end
|
#replace_columns(schema) ⇒ Object
121
122
123
|
# File 'lib/rbhive/connection.rb', line 121
def replace_columns(schema)
execute(schema.replace_columns_statement)
end
|
#schema(example_row = []) ⇒ Object
108
109
110
|
# File 'lib/rbhive/connection.rb', line 108
def schema(example_row=[])
safe { SchemaDefinition.new(client.getSchema, example_row) }
end
|
#set(name, value) ⇒ Object
75
76
77
78
|
# File 'lib/rbhive/connection.rb', line 75
def set(name,value)
@logger.info("Setting #{name}=#{value}")
client.execute("SET #{name}=#{value}")
end
|