Class: OMF::OML::OmlSqlRow
- Defined in:
- lib/omf_oml/sql_row.rb
Overview
Read the content of a table and feeds it out as a tuple store. After creation of the object. The actual tuple feed is started with a call to run
.
Instance Attribute Summary collapse
-
#row ⇒ Object
readonly
Returns the value of attribute row.
Attributes inherited from Tuple
Instance Method Summary collapse
-
#[](name_or_index) ⇒ Object
Return a specific element of the vector identified either by it’s name, or its col index.
-
#initialize(sql_table_name, schema, query, opts = {}) ⇒ OmlSqlRow
constructor
Create a representation of a row in a database.
-
#on_new_tuple(key = :_, &proc) ⇒ Object
Register a proc to be called when a new tuple arrived on this stream.
- #run(in_thread = true) ⇒ Object
-
#select(*col_names) ⇒ Object
Return an array including the values for the names elements given as parameters.
- #seq_no ⇒ Object
-
#to_a(schema = nil) ⇒ Object
Return the elements of the row as an array using the associated schema or ‘schema’ if non-nil.
-
#to_stream(opts = {}, &block) ⇒ Object
Return a table which will capture the content of this tuple stream.
-
#to_table(name = nil, opts = {}) ⇒ Object
Return a table which will capture the content of this tuple stream.
- #ts ⇒ Object
Methods inherited from Tuple
Constructor Details
#initialize(sql_table_name, schema, query, opts = {}) ⇒ OmlSqlRow
Create a representation of a row in a database. Can be used to fill a table.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/omf_oml/sql_row.rb', line 33 def initialize(sql_table_name, schema, query, opts = {}) debug "query: #{query.sql}" @sname = sql_table_name @schema = schema raise "Expected OmlSchema but got '#{schema.class}'" unless schema.is_a? OmlSchema @query = query unless @offset = opts[:offset] @offset = 0 end @limit = opts[:limit] @limit = 1000 unless @limit @check_interval = opts[:check_interval] @check_interval = -1 unless @check_interval @query_interval = opts[:query_interval] @on_new_vector_proc = {} super opts[:name] || sql_table_name, schema end |
Instance Attribute Details
#row ⇒ Object (readonly)
Returns the value of attribute row.
19 20 21 |
# File 'lib/omf_oml/sql_row.rb', line 19 def row @row end |
Instance Method Details
#[](name_or_index) ⇒ Object
Return a specific element of the vector identified either by it’s name, or its col index
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/omf_oml/sql_row.rb', line 59 def [](name_or_index) if name_or_index.is_a? Integer @row[@schema.name_at_index(name_or_index)] else unless @row.key? name_or_index raise "Unknown column name '#{name_or_index}'" end @row[name_or_index] end end |
#on_new_tuple(key = :_, &proc) ⇒ Object
Register a proc to be called when a new tuple arrived on this stream.
101 102 103 104 105 106 107 108 |
# File 'lib/omf_oml/sql_row.rb', line 101 def on_new_tuple(key = :_, &proc) if proc @on_new_vector_proc[key] = proc else @on_new_vector_proc.delete key end run() unless @on_new_vector_proc.empty? end |
#run(in_thread = true) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/omf_oml/sql_row.rb', line 163 def run(in_thread = true) return if @running if in_thread Thread.new do begin _run rescue Exception => ex error "Exception in OmlSqlRow: #{ex}" debug "Exception in OmlSqlRow: #{ex.backtrace.join("\n\t")}" end end else _run end end |
#select(*col_names) ⇒ Object
Return an array including the values for the names elements given as parameters.
80 81 82 83 84 85 86 87 88 |
# File 'lib/omf_oml/sql_row.rb', line 80 def select(*col_names) r = @row col_names.collect do |n| unless @row.key? n raise "Unknown column name '#{n}'" end @row[n] end end |
#seq_no ⇒ Object
94 95 96 |
# File 'lib/omf_oml/sql_row.rb', line 94 def seq_no self[:oml_seq] end |
#to_a(schema = nil) ⇒ Object
Return the elements of the row as an array using the associated schema or ‘schema’ if non-nil.
73 74 75 |
# File 'lib/omf_oml/sql_row.rb', line 73 def to_a(schema = nil) a = (schema || @schema).hash_to_row(@row) # don't need type conversion as sequel is doing this for us end |
#to_stream(opts = {}, &block) ⇒ Object
Return a table which will capture the content of this tuple stream.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/omf_oml/sql_row.rb', line 117 def to_stream(opts = {}, &block) unless schema = opts.delete(:schema) include_oml_internals = (opts[:include_oml_internals] != false) schema = self.schema.clone(!include_oml_internals) if include_oml_internals # replace sender_id by sender ... see _run_once schema.replace_column_at 0, :oml_sender end end self.on_new_tuple(rand()) do |t| #v = t.to_a(schema) v = t.row block.arity == 1 ? block.call(v) : block.call(v, schema) end schema end |
#to_table(name = nil, opts = {}) ⇒ Object
Return a table which will capture the content of this tuple stream.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/omf_oml/sql_row.rb', line 141 def to_table(name = nil, opts = {}) unless name name = @sname end unless schema = opts.delete(:schema) include_oml_internals = (opts[:include_oml_internals] != false) schema = self.schema.clone(!include_oml_internals) if include_oml_internals # replace sender_id by sender ... see _run_once schema.replace_column_at 0, :oml_sender end end t = OMF::OML::OmlTable.new(name, schema, opts) #puts ">>>>SCHEMA>>> #{schema.inspect}" self.on_new_tuple(t) do |v| r = v.to_a(schema) #puts r.inspect t.add_row(r) end t end |
#ts ⇒ Object
90 91 92 |
# File 'lib/omf_oml/sql_row.rb', line 90 def ts self[:oml_ts_server] end |