Class: OMF::OML::OmlTable

Inherits:
Base::LObject
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/omf_oml/table.rb

Overview

This class represents a database like table holding a sequence of OML measurements (rows) according a common schema.

Direct Known Subclasses

OmlCsvTable, OmlIndexedTable

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tname, schema, opts = {}, &on_before_row_added) ⇒ OmlTable

tname - Name of table schema - OmlSchema or Array containing [name, type*] for every column in table

Table adds a '__id__' column at the beginning which keeps track of the rows unique id unless
option 'supress_index' is set.

opts -

:max_size - keep table to that size by dropping older rows
:supress_index - don't add index, even if schema doesn't start with '__id__'


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/omf_oml/table.rb', line 49

def initialize(tname, schema, opts = {}, &on_before_row_added)
  super tname
  #@endpoint = endpoint
  @name = tname
  @schema = OmlSchema.create(schema)
  @add_index = false
  unless opts[:supress_index]
    unless @schema.name_at(0) == :__id__
      @add_index = true
      @schema.insert_column_at(0, [:__id__, 'int'])
    end
  end
  @col_count = @schema.columns.length
  @opts = opts
  if (index = opts[:index])
    throw "No longer supported, use IndexedTable instead"
    # @indexed_rows = {}
    # @index_col = @schema.index_for_col(index)
  end
  @on_before_row_added = on_before_row_added
  @offset = 0 # number of rows skipped before the first one recorded here
  @rows = []
  @row_id = 0 # Each new row is assigned an id
  @max_size = opts[:max_size]
  @on_content_changed = {}
end

Instance Attribute Details

#max_sizeObject

Returns the value of attribute max_size.



36
37
38
# File 'lib/omf_oml/table.rb', line 36

def max_size
  @max_size
end

#nameObject (readonly)

Returns the value of attribute name.



35
36
37
# File 'lib/omf_oml/table.rb', line 35

def name
  @name
end

#offsetObject (readonly)

Returns the value of attribute offset.



38
39
40
# File 'lib/omf_oml/table.rb', line 38

def offset
  @offset
end

#schemaObject (readonly)

Returns the value of attribute schema.



37
38
39
# File 'lib/omf_oml/table.rb', line 37

def schema
  @schema
end

Class Method Details

.create(tname, schema, opts = {}, &on_before_row_added) ⇒ Object

Parameters:

  • opts (defaults to: {})


25
26
27
28
29
30
31
32
# File 'lib/omf_oml/table.rb', line 25

def self.create(tname, schema, opts = {}, &on_before_row_added)
  if (index = opts.delete(:index))
    require 'omf_oml/indexed_table'
    OmlIndexedTable.new(tname, index, schema, &on_before_row_added)
  else
    OmlTable.new(tname, schema, opts, &on_before_row_added)
  end
end

Instance Method Details

#<<(row) ⇒ Object



134
135
136
# File 'lib/omf_oml/table.rb', line 134

def <<(row)
  add_row(row)
end

#add_row(row, needs_casting = false) ⇒ Object

NOTE: on_row_added callbacks are done within the monitor.



126
127
128
129
130
131
132
# File 'lib/omf_oml/table.rb', line 126

def add_row(row, needs_casting = false)
  synchronize do
    if row = _add_row(row, needs_casting)
      _notify_content_changed(:added, [row])
    end
  end
end

#add_rows(rows, needs_casting = false) ⇒ Object

Add an array of rows to this table



151
152
153
154
155
156
157
158
159
# File 'lib/omf_oml/table.rb', line 151

def add_rows(rows, needs_casting = false)
  synchronize do
    added = rows.map { |row| _add_row(row, needs_casting) }
    added = added.compact
    unless added.empty?
      _notify_content_changed(:added, added)
    end
  end
end

#create_sliced_table(col_name, col_value, table_opts = {}) ⇒ Object

Return a new table which only contains the rows of this table whose value in column ‘col_name’ is equal to ‘col_value’



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/omf_oml/table.rb', line 164

def create_sliced_table(col_name, col_value, table_opts = {})
  sname = "#{@name}_slice_#{Kernel.rand}"

  st = self.class.new(name, @schema, @opts.merge(table_opts))
  st.instance_variable_set(:@sname, sname)
  st.instance_variable_set(:@master_ds, self)
  st.instance_variable_set(:@add_index, true)
  def st.release
    @master_ds.on_content_changed(@sname) # release callback
  end

  index = @schema.index_for_col(col_name)
  on_content_changed(sname, 0) do |action, rows|
    if action == :removed
      warn "No support for removing rows from sliced table '#{sname}'."
      next
    end
    first_row = true
    rows.each do |row|
      if row[index] == col_value
        row = row[1 .. -1] # remove the row_id
        debug "Add first row '#{row.inspect}'" if first_row
        st.add_row(row)
        first_row = false
      end
    end
  end
  debug "Created sliced table from '#{@name}' (rows: #{st.rows.length}-#{@rows.length})"
  st
end

#data_sourcesObject



205
206
207
# File 'lib/omf_oml/table.rb', line 205

def data_sources
  self
end

#describeObject



201
202
203
# File 'lib/omf_oml/table.rb', line 201

def describe()
  rows
end

#indexed_by(col_name) ⇒ Object

Return a new table which shadows this table but only contains rows with unique values in the column ‘col_name’ and of these the latest added rows to this table.

col_name - Name of column to use for indexing



144
145
146
147
# File 'lib/omf_oml/table.rb', line 144

def indexed_by(col_name)
  require 'omf_oml/indexed_table'
  OmlIndexedTable.shadow(self, col_name)
end

#on_before_row_added(&callback) ⇒ Object

Register callback to be called to process any newly offered row before it being added to internal storage. The callback’s argument is the new row (TODO: in what form) and should return what is being added instead of the original row. If the callback returns nil, nothing is being added.



87
88
89
# File 'lib/omf_oml/table.rb', line 87

def on_before_row_added(&callback)
  @on_before_row_added = callback
end

#on_content_changed(key = nil, offset = -1,, &proc) ⇒ Object

Register callback for when the content of the table is changes. The key allows for the callback to be removed by calling this method without a block. . If the optional ‘offset’ value is set to zero or a positive value, then the currently stored values starting at this index are being immediately sent to ‘proc’. The ‘proc’ is expected to receive two parameters, an ‘action’ and the content changed. The ‘action’ is either ‘:added’, or ‘:removed’ and the content is an array of rows.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/omf_oml/table.rb', line 100

def on_content_changed(key = nil, offset = -1, &proc)
  #puts ">>>>>>> #{offset}"
  if proc
    @on_content_changed[key || proc] = proc
    if offset >= 0
      #with_offset = proc.arity == 2
      proc.call(:added, @rows[offset .. -1])
      #.each_with_index do |r, i|
        # with_offset ? proc.call(r, offset + i) : proc.call(r)
      # end
    end
  else
    @on_content_changed.delete key
  end
end

#on_row_added(key, &block) ⇒ Object



116
117
118
119
120
121
122
# File 'lib/omf_oml/table.rb', line 116

def on_row_added(key, &block)
  on_content_changed(key) do |action, rows|
    if action == :added
      rows.each {|r| block.call(r)}
    end
  end
end

#rowsObject



76
77
78
79
# File 'lib/omf_oml/table.rb', line 76

def rows
  #@indexed_rows ? @indexed_rows.values : @rows
  @rows
end

#to_aObject

Return table as an array of rows



197
198
199
# File 'lib/omf_oml/table.rb', line 197

def to_a
  @rows.dup
end