Class: OMF::OML::OmlSqlRow

Inherits:
OmlTuple show all
Defined in:
lib/omf_oml/sql_row.rb

Overview

Read the content of a table and feeds it out as a tuple store. After creation of the object. The actual tuple feed is started with a call to run.

Instance Attribute Summary collapse

Attributes inherited from Tuple

#schema, #stream_name

Instance Method Summary collapse

Methods inherited from Tuple

#create_table, #parse_tuple

Constructor Details

#initialize(sql_table_name, schema, query, opts = {}) ⇒ OmlSqlRow

Create a representation of a row in a database. Can be used to fill a table.

Parameters:

  • sql_table_name (String)
    • name of SQL table in respective SQL database

  • schema (OmlSchema)
    • the schema describing the tuple

  • query (Sequel::Dataset)
    • Databasequery to execute

  • opts: (Hash)
    • offset: Ignore first offset rows. If negative or zero serve offset rows initially

    • limit: Number of rows to fetch each time [1000]

    • check_interval: Interval in seconds when to check for new data. If < 0, only run once.

    • query_interval: Interval between consecutive queries when processing large result set.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/omf_oml/sql_row.rb', line 33

def initialize(sql_table_name, schema, query, opts = {})
  debug "query:  #{query.sql}"
  @sname = sql_table_name
  @schema = schema
  raise "Expected OmlSchema but got '#{schema.class}'" unless schema.is_a? OmlSchema
  @query = query

  unless @offset = opts[:offset]
    @offset = 0
  end
  @limit = opts[:limit]
  @limit = 1000 unless @limit

  @check_interval = opts[:check_interval]
  @check_interval = -1 unless @check_interval
  @query_interval = opts[:query_interval]

  @on_new_vector_proc = {}

  super opts[:name] || sql_table_name, schema
end

Instance Attribute Details

#rowObject (readonly)

Returns the value of attribute row.



19
20
21
# File 'lib/omf_oml/sql_row.rb', line 19

def row
  @row
end

Instance Method Details

#[](name_or_index) ⇒ Object

Return a specific element of the vector identified either by it’s name, or its col index



59
60
61
62
63
64
65
66
67
68
# File 'lib/omf_oml/sql_row.rb', line 59

def [](name_or_index)
  if name_or_index.is_a? Integer
    @row[@schema.name_at_index(name_or_index)]
  else
    unless @row.key? name_or_index
      raise "Unknown column name '#{name_or_index}'"
    end
    @row[name_or_index]
  end
end

#on_new_tuple(key = :_, &proc) ⇒ Object

Register a proc to be called when a new tuple arrived on this stream.



101
102
103
104
105
106
107
108
# File 'lib/omf_oml/sql_row.rb', line 101

def on_new_tuple(key = :_, &proc)
  if proc
    @on_new_vector_proc[key] = proc
  else
    @on_new_vector_proc.delete key
  end
  run() unless @on_new_vector_proc.empty?
end

#run(in_thread = true) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/omf_oml/sql_row.rb', line 163

def run(in_thread = true)
  return if @running
  if in_thread
    Thread.new do
      begin
        _run
      rescue Exception => ex
        error "Exception in OmlSqlRow: #{ex}"
        debug "Exception in OmlSqlRow: #{ex.backtrace.join("\n\t")}"
      end
    end
  else
    _run
  end
end

#select(*col_names) ⇒ Object

Return an array including the values for the names elements given as parameters.



80
81
82
83
84
85
86
87
88
# File 'lib/omf_oml/sql_row.rb', line 80

def select(*col_names)
  r = @row
  col_names.collect do |n|
    unless @row.key? n
      raise "Unknown column name '#{n}'"
    end
    @row[n]
  end
end

#seq_noObject



94
95
96
# File 'lib/omf_oml/sql_row.rb', line 94

def seq_no
  self[:oml_seq]
end

#to_a(schema = nil) ⇒ Object

Return the elements of the row as an array using the associated schema or ‘schema’ if non-nil.



73
74
75
# File 'lib/omf_oml/sql_row.rb', line 73

def to_a(schema = nil)
  a = (schema || @schema).hash_to_row(@row) # don't need type conversion as sequel is doing this for us
end

#to_stream(opts = {}, &block) ⇒ Object

Return a table which will capture the content of this tuple stream.

Parameters:

  • name (string)
    • Name to use for returned table

  • opts (Hash) (defaults to: {})

    Options to be passed on to Table constructor



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/omf_oml/sql_row.rb', line 117

def to_stream(opts = {}, &block)
  unless schema = opts.delete(:schema)
    include_oml_internals = (opts[:include_oml_internals] != false)
    schema = self.schema.clone(!include_oml_internals)
    if include_oml_internals
      # replace sender_id by sender ... see _run_once
      schema.replace_column_at 0, :oml_sender
    end
  end
  self.on_new_tuple(rand()) do |t|
    #v = t.to_a(schema)
    v = t.row
    block.arity == 1 ? block.call(v) : block.call(v, schema)
  end
  schema
end

#to_table(name = nil, opts = {}) ⇒ Object

Return a table which will capture the content of this tuple stream.

Parameters:

  • name (string) (defaults to: nil)
    • Name to use for returned table

  • opts (Hash) (defaults to: {})

    Options to be passed on to Table constructor



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/omf_oml/sql_row.rb', line 141

def to_table(name = nil, opts = {})
  unless name
    name = @sname
  end
  unless schema = opts.delete(:schema)
    include_oml_internals = (opts[:include_oml_internals] != false)
    schema = self.schema.clone(!include_oml_internals)
    if include_oml_internals
      # replace sender_id by sender ... see _run_once
      schema.replace_column_at 0, :oml_sender
    end
  end
  t = OMF::OML::OmlTable.new(name, schema, opts)
  #puts ">>>>SCHEMA>>> #{schema.inspect}"
  self.on_new_tuple(t) do |v|
    r = v.to_a(schema)
    #puts r.inspect
    t.add_row(r)
  end
  t
end

#tsObject



90
91
92
# File 'lib/omf_oml/sql_row.rb', line 90

def ts
  self[:oml_ts_server]
end