Class: OMF::OML::OmlSqlSource

Inherits:
Base::LObject
  • Object
show all
Defined in:
lib/omf_oml/sql_source.rb

Overview

This class fetches the content of an SQL database and serves it as multiple OML streams.

After creating the object, the @run@ method needs to be called to start producing the streams.

Constant Summary collapse

FALLBACK_MAPPING =

Sequel adaptors sometimes don’t return a :type identifier, but always return the :db_type. This is a list of maps which may not work for all adaptors

{
  'UNSIGNED INTEGER' => :integer,
  'UNSIGNED BIGINT' => :integer
}

Instance Method Summary collapse

Constructor Details

#initialize(db_opts, row_opts = {}) ⇒ OmlSqlSource

db_opts - Options used to create a Sequel adapter

Sequel.connect(:adapter=>‘postgres’, :host=>‘norbit.npc.nicta.com.au’, :user=>‘oml2’, :password=>‘omlisgoodforyou’, :database=>‘openflow-demo’)



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/omf_oml/sql_source.rb', line 37

def initialize(db_opts, row_opts = {})
  OMF::Base::Loggable.init_log('OmlSqlSource')
  @running = false
  @on_new_stream_procs = {}
  @tables = {}
  @db_opts = db_opts
  debug "Opening DB (#{db_opts})"
  @db = Sequel.connect(db_opts)
  debug "DB: #{@db.inspect}"
  @row_opts = row_opts
end

Instance Method Details

#create_stream(table_name, opts = {}, &block) ⇒ Object

Call ‘block’ for every row in ‘table_name’ table.

table_name - Name of table in the SQL database opts -

:schema[Schema] Schema to use for creating row
All other options defined for OmlSqlRow#new

returns OmlSqlRow



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/omf_oml/sql_source.rb', line 91

def create_stream(table_name, opts = {}, &block)
  rschema = opts.delete(:schema)
  schema = _schema_for_table(table_name)
  r = OmlSqlRow.new(table_name, schema, _def_query_for_table(table_name), opts)
  if block
    ropts = {}
    ropts[:schema] = rschema if rschema
    r.to_stream(ropts, &block)
  end
  r
end

#create_table(table_name, opts = {}) ⇒ Object

Return a table (more precisely an OmlTable instance) fed from the content of a table ‘table_name’ in this database.

table_name - Name of table in the SQL database opts -

:name - name used for returned OML Table [table_name]
:schema - Schema to use instead of default table schema
:query - Query to use instead of default one
All other options defined for OmlSqlRow#new


70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/omf_oml/sql_source.rb', line 70

def create_table(table_name, opts = {})
  tn = opts.delete(:name) || table_name
  schema = opts.delete(:schema) || _schema_for_table(table_name)
  if q = opts.delete(:query)
    query = (q.is_a? String) ? @db[q] : q
  else
    query = _def_query_for_table(table_name)
  end
  r = OmlSqlRow.new(table_name, schema, query, opts)
  opts[:schema] = schema
  r.to_table(tn, opts)
end

#dataset(db_table_name) ⇒ Object

Return a Sequel Dataset from ‘table_name’. See Sequel documentation on what one can do with that.

db_table_name Name of table in database



139
140
141
# File 'lib/omf_oml/sql_source.rb', line 139

def dataset(db_table_name)
  @db.from(db_table_name)
end

#on_new_stream(key = :_, &proc) ⇒ Object

Register a proc to be called when a new stream was discovered on this endpoint.



52
53
54
55
56
57
58
# File 'lib/omf_oml/sql_source.rb', line 52

def on_new_stream(key = :_, &proc)
  if proc
    @on_new_stream_procs[key] = proc
  else
    @on_new_stream_procs.delete key
  end
end

#query(sql, table_name, schema) ⇒ Object

Run a query on the database and return the result as an OmlTable. The provided schema needs to describe the SQL queries result set. Unfortunately we can only do very little sanity checks here



108
109
110
111
112
113
114
# File 'lib/omf_oml/sql_source.rb', line 108

def query(sql, table_name, schema)
  tbl = OmlTable.create(table_name, schema)
  @db.fetch(sql).each do |row|
    tbl << schema.hash_to_row(row)
  end
  tbl
end

#query2(table_name, schema, &block) ⇒ Object

Run a query on the database and return the result as an OmlTable. The provided schema needs to describe the SQL queries result set. Unfortunately we can only do very little sanity checks here. The query will be defined in the provided block which is passed in the Sequel Database object and is expected to return a Sequel Dataset instance.



122
123
124
125
126
127
128
129
130
131
132
# File 'lib/omf_oml/sql_source.rb', line 122

def query2(table_name, schema, &block)
  tbl = OmlTable.create(table_name, schema)
  q = block.call(@db)
  unless q.is_a? Sequel::Dataset
    raise "Expected a Sequel::Dataset object, but got '#{q.class}'"
  end
  q.each do |row|
    tbl << tbl.schema.hash_to_row(row)
  end
  tbl
end

#run(check_every = -1)) ⇒ Object

Start checking the database for tables and create a new stream by calling the internal report_new_table method. If check_every > 0 continue checking every check_every seconds for new tables in the database, otherwise it’s only checked once



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/omf_oml/sql_source.rb', line 148

def run(check_every = -1)
  if check_every <= 0
    run_once()
  else
    Thread.new do
      @running = true
      while (@running)
        begin
          run_once()
        rescue Exception => ex
          error "Exception in OmlSqlSource#run: #{ex}"
          debug "Exception in OmlSqlSource#run: #{ex.backtrace.join("\n\t")}"
        end
        sleep check_every
      end
    end
  end
end