Class: OMF::OML::OmlSqlSource
- Inherits:
-
Base::LObject
- Object
- Base::LObject
- OMF::OML::OmlSqlSource
- Defined in:
- lib/omf_oml/sql_source.rb
Overview
This class fetches the content of an SQL database and serves it as multiple OML streams.
After creating the object, the @run@ method needs to be called to start producing the streams.
Constant Summary collapse
- FALLBACK_MAPPING =
Sequel adaptors sometimes don’t return a :type identifier, but always return the :db_type. This is a list of maps which may not work for all adaptors
{ 'UNSIGNED INTEGER' => :integer, 'UNSIGNED BIGINT' => :integer }
Instance Method Summary collapse
-
#create_stream(table_name, opts = {}, &block) ⇒ Object
Call ‘block’ for every row in ‘table_name’ table.
-
#create_table(table_name, opts = {}) ⇒ Object
Return a table (more precisely an OmlTable instance) fed from the content of a table ‘table_name’ in this database.
-
#dataset(db_table_name) ⇒ Object
Return a Sequel Dataset from ‘table_name’.
-
#initialize(db_opts, row_opts = {}) ⇒ OmlSqlSource
constructor
db_opts - Options used to create a Sequel adapter.
-
#on_new_stream(key = :_, &proc) ⇒ Object
Register a proc to be called when a new stream was discovered on this endpoint.
-
#query(sql, table_name, schema) ⇒ Object
Run a query on the database and return the result as an OmlTable.
-
#query2(table_name, schema, &block) ⇒ Object
Run a query on the database and return the result as an OmlTable.
-
#run(check_every = -1)) ⇒ Object
Start checking the database for tables and create a new stream by calling the internal
report_new_table
method.
Constructor Details
#initialize(db_opts, row_opts = {}) ⇒ OmlSqlSource
db_opts - Options used to create a Sequel adapter
Sequel.connect(:adapter=>‘postgres’, :host=>‘norbit.npc.nicta.com.au’, :user=>‘oml2’, :password=>‘omlisgoodforyou’, :database=>‘openflow-demo’)
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/omf_oml/sql_source.rb', line 37 def initialize(db_opts, row_opts = {}) OMF::Base::Loggable.init_log('OmlSqlSource') @running = false @on_new_stream_procs = {} @tables = {} @db_opts = db_opts debug "Opening DB (#{db_opts})" @db = Sequel.connect(db_opts) debug "DB: #{@db.inspect}" @row_opts = row_opts end |
Instance Method Details
#create_stream(table_name, opts = {}, &block) ⇒ Object
Call ‘block’ for every row in ‘table_name’ table.
table_name - Name of table in the SQL database opts -
:schema[Schema] Schema to use for creating row
All other options defined for OmlSqlRow#new
returns OmlSqlRow
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/omf_oml/sql_source.rb', line 91 def create_stream(table_name, opts = {}, &block) rschema = opts.delete(:schema) schema = _schema_for_table(table_name) r = OmlSqlRow.new(table_name, schema, _def_query_for_table(table_name), opts) if block ropts = {} ropts[:schema] = rschema if rschema r.to_stream(ropts, &block) end r end |
#create_table(table_name, opts = {}) ⇒ Object
Return a table (more precisely an OmlTable instance) fed from the content of a table ‘table_name’ in this database.
table_name - Name of table in the SQL database opts -
:name - name used for returned OML Table [table_name]
:schema - Schema to use instead of default table schema
:query - Query to use instead of default one
All other options defined for OmlSqlRow#new
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/omf_oml/sql_source.rb', line 70 def create_table(table_name, opts = {}) tn = opts.delete(:name) || table_name schema = opts.delete(:schema) || _schema_for_table(table_name) if q = opts.delete(:query) query = (q.is_a? String) ? @db[q] : q else query = _def_query_for_table(table_name) end r = OmlSqlRow.new(table_name, schema, query, opts) opts[:schema] = schema r.to_table(tn, opts) end |
#dataset(db_table_name) ⇒ Object
Return a Sequel Dataset from ‘table_name’. See Sequel documentation on what one can do with that.
db_table_name Name of table in database
139 140 141 |
# File 'lib/omf_oml/sql_source.rb', line 139 def dataset(db_table_name) @db.from(db_table_name) end |
#on_new_stream(key = :_, &proc) ⇒ Object
Register a proc to be called when a new stream was discovered on this endpoint.
52 53 54 55 56 57 58 |
# File 'lib/omf_oml/sql_source.rb', line 52 def on_new_stream(key = :_, &proc) if proc @on_new_stream_procs[key] = proc else @on_new_stream_procs.delete key end end |
#query(sql, table_name, schema) ⇒ Object
Run a query on the database and return the result as an OmlTable. The provided schema needs to describe the SQL queries result set. Unfortunately we can only do very little sanity checks here
108 109 110 111 112 113 114 |
# File 'lib/omf_oml/sql_source.rb', line 108 def query(sql, table_name, schema) tbl = OmlTable.create(table_name, schema) @db.fetch(sql).each do |row| tbl << schema.hash_to_row(row) end tbl end |
#query2(table_name, schema, &block) ⇒ Object
Run a query on the database and return the result as an OmlTable. The provided schema needs to describe the SQL queries result set. Unfortunately we can only do very little sanity checks here. The query will be defined in the provided block which is passed in the Sequel Database object and is expected to return a Sequel Dataset instance.
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/omf_oml/sql_source.rb', line 122 def query2(table_name, schema, &block) tbl = OmlTable.create(table_name, schema) q = block.call(@db) unless q.is_a? Sequel::Dataset raise "Expected a Sequel::Dataset object, but got '#{q.class}'" end q.each do |row| tbl << tbl.schema.hash_to_row(row) end tbl end |
#run(check_every = -1)) ⇒ Object
Start checking the database for tables and create a new stream by calling the internal report_new_table
method. If check_every
> 0 continue checking every check_every
seconds for new tables in the database, otherwise it’s only checked once
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/omf_oml/sql_source.rb', line 148 def run(check_every = -1) if check_every <= 0 run_once() else Thread.new do @running = true while (@running) begin run_once() rescue Exception => ex error "Exception in OmlSqlSource#run: #{ex}" debug "Exception in OmlSqlSource#run: #{ex.backtrace.join("\n\t")}" end sleep check_every end end end end |