Class: Sneaql::Transform

Inherits:
Object
  • Object
show all
Defined in:
lib/sneaql.rb

Overview

Manages and executes a SneaQL transform.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params, logger = nil) ⇒ Transform

Creates a SneaQL transform object.

Examples:

t=Sneaql::Transform.new({
  transform_name: 'test-transform',
  repo_base_dir: "test/fixtures/test-transform",
  repo_type: 'local',
  jdbc_url: 'jdbc:sqlite:memory',
  db_user: 'dbuser',
  db_pass: 'password',
  step_metadata_manager_type: 'local_file',
  step_metadata_file_path: "test/fixtures/test-transform/steps.json"
}, logger)

t.run

Parameters:

  • params (Hash)

    various parameters are passed to define the transform

  • logger (Logger) (defaults to: nil)

    customer logger if provided (otherwise default logger is created)



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sneaql.rb', line 59

def initialize(params, logger = nil)
  # initialzing and basic parameter stuff
  @logger = logger ? logger : Logger.new(STDOUT)
  @start_time = Time.new.utc
  current_status(:initializing)
  @params = params
  @exit_code = 0
  @transform_name = @params[:transform_name]
  @jdbc_url = @params[:jdbc_url]
  @db_user = @params[:db_user]
  @db_pass = @params[:db_pass]

  # build fancy objects for processing the transform
  @expression_handler = create_expression_handler
  @recordset_manager = create_recordset_manager
  @exception_manager = create_exception_manager
  @repo_manager = create_repo_manager
  @steps = 
  @parsed_steps = create_parsed_steps(@steps)

  run if @params[:run] == true
end

Instance Attribute Details

#current_statementObject (readonly)

Returns the value of attribute current_statement.



19
20
21
# File 'lib/sneaql.rb', line 19

def current_statement
  @current_statement
end

#current_stepObject (readonly)

Returns the value of attribute current_step.



18
19
20
# File 'lib/sneaql.rb', line 18

def current_step
  @current_step
end

#end_timeObject (readonly)

Returns the value of attribute end_time.



21
22
23
# File 'lib/sneaql.rb', line 21

def end_time
  @end_time
end

#exit_codeObject (readonly)

Returns the value of attribute exit_code.



22
23
24
# File 'lib/sneaql.rb', line 22

def exit_code
  @exit_code
end

#start_timeObject (readonly)

Returns the value of attribute start_time.



20
21
22
# File 'lib/sneaql.rb', line 20

def start_time
  @start_time
end

#statusObject (readonly)

Returns the value of attribute status.



24
25
26
# File 'lib/sneaql.rb', line 24

def status
  @status
end

#transform_errorObject (readonly)

Returns the value of attribute transform_error.



23
24
25
# File 'lib/sneaql.rb', line 23

def transform_error
  @transform_error
end

Instance Method Details

#create_exception_managerObject

Creates ExceptionHandler object



149
150
151
# File 'lib/sneaql.rb', line 149

def create_exception_manager
  Sneaql::Exceptions::ExceptionManager.new(@logger)
end

#create_expression_handlerSneaql::Core::ExpressionHandler

Creates an ExpressionHandler object



144
145
146
# File 'lib/sneaql.rb', line 144

def create_expression_handler
  Sneaql::Core::ExpressionHandler.new(@logger)
end

#create_jdbc_connectionObject

Creates a JDBC connection JDBC drivers must loaded into jruby before this will work.



203
204
205
206
207
208
209
210
211
212
# File 'lib/sneaql.rb', line 203

def create_jdbc_connection
  # db specific driver should have been handled by the calling procedure
  current_status(:connecting_to_database)
  JDBCHelpers::ConnectionFactory.new(
    @jdbc_url,
    @db_user,
    @db_pass,
    @logger
  ).connection
end

#create_metadata_managerSneaql::Core::StepMetadataManager

Creates a StepMetadataManager object The actual object returns depends upon params provided at initialize.



163
164
165
166
167
168
# File 'lib/sneaql.rb', line 163

def 
  Sneaql::Core.find_class(
    :step_metadata_manager,
    @params[:step_metadata_manager_type]
  ).new(@params, @logger).steps
end

#create_parsed_steps(steps) ⇒ Array

Creates a StepParser object for each step file defined by the metadata manager.

Parameters:

  • steps (Array)

    takes an array of step definitions

Returns:

  • (Array)

    of Sneaql::Core::StepParser



173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/sneaql.rb', line 173

def create_parsed_steps(steps)
  steps.map do |s|
    {
      parser: Sneaql::Core::StepParser.new(
        "#{@repo_manager.repo_base_dir}/#{s[:step_file]}",
        @expression_handler,
        @recordset_manager,
        @logger
      ),
      step_number: s[:step_number]
    }
  end
end

#create_recordset_managerSneaql::Core::RecordsetManager

Creates an RecordsetManager object



197
198
199
# File 'lib/sneaql.rb', line 197

def create_recordset_manager
  Sneaql::Core::RecordsetManager.new(@expression_handler, @logger)
end

#create_repo_managerSneaql::Core::RepoDownloadManager

Creates a RepoDownloadManager object The actual object returns depends upon params provided at initialize.



156
157
158
# File 'lib/sneaql.rb', line 156

def create_repo_manager
  Sneaql::Core.find_class(:repo_manager, @params[:repo_type]).new(@params, @logger)
end

#current_status(status) ⇒ Object

Sets the current status of the transform. Must be a valid status type or it will not be set. Override this if you want to implement a custom status communication to an external target.

Parameters:

  • status (Symbol)

See Also:

  • valid_status


39
40
41
# File 'lib/sneaql.rb', line 39

def current_status(status)
  @status = status if valid_statuses.include?(status)
end

#iterate_steps_and_statementsObject

Performs the actual work of running the transform steps. This method operates within the context of a single database session across all steps. If it fails, it will not rollback automatically unless that is the default RDBMS behavior for a connection that closes before a commit.



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/sneaql.rb', line 219

def iterate_steps_and_statements
  @parsed_steps.each_with_index do |this_step, i|
    # raise any lingering errors not handled in previous step
    raise @exception_manager.pending_error if @exception_manager.pending_error != nil
    # set this so that other processes can poll the state
    @current_step = this_step[:step_number]
    # within a step... iterate through each statement
    this_step[:parser].statements.each_with_index do |this_stmt, stmt_index|
      # set this so that other processes can poll the state
      @current_statement = stmt_index + 1

      # log the pending error
      @exception_manager.output_pending_error

      # log some useful info
      @logger.info("step: #{@current_step} statement: #{@current_statement}")
      @expression_handler.output_all_session_variables

      # get the command hash for the current statement
      this_cmd = this_step[:parser].command_at_index(stmt_index)
      @logger.debug(this_cmd)

      # evaluate any variable references in the arguments
      if this_cmd[:arguments]
        this_cmd[:arguments].map! { |a| @expression_handler.evaluate_expression(a) }
      end

      begin
        # find the class assciated with this command
        k = Sneaql::Core.find_class(:command, this_cmd[:command])

        # if there is an error... check to see if this is the error handler
        if @exception_manager.pending_error != nil
          unless k == Sneaql::Core::Commands::SneaqlOnError
            raise @exception_manager.pending_error
          end
        end

        # instantiate a new instance of the command class
        # and call it's action method with arguments
        c = k.new(
          @jdbc_connection,
          @expression_handler,
          @exception_manager,
          @recordset_manager,
          @expression_handler.evaluate_all_expressions(this_stmt),
          @logger
        )

        # performs the work of the current command
        c.action(*this_cmd[:arguments])

        # check if there was an error from the action
        if @exception_manager.pending_error != nil
          # if there was an error... check to see if this is the last stmt in step
          if stmt_index == (this_step[:parser].statements.length - 1)
            # last step... so we know there is no error handler in this step
            # therefore we should propagate the error
            raise @exception_manager.pending_error
          end
        end

      rescue Sneaql::Exceptions::SQLTestStepExitCondition => e
        @logger.info e.message
        break
      end
    end
  end
rescue Sneaql::Exceptions::SQLTestStepExitCondition, Sneaql::Exceptions::SQLTestExitCondition
  @logger.info("SQLTest Exception Handled, continuing")
rescue => e
  @logger.error(e.message)
  e.backtrace.each { |r| @logger.error(r) }
  raise e
end

#runObject

Runs the actual transform.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/sneaql.rb', line 112

def run
  current_status(:validating)
  validate_parsed_steps(@parsed_steps)
  current_status(:connecting_to_database)
  @jdbc_connection = create_jdbc_connection
  current_status(:running)
  iterate_steps_and_statements
rescue Sneaql::Exceptions::SQLTestExitCondition => e
  @transform_error = nil
  @logger.info(e.message)
rescue => e
  @exit_code = 1
  @transform_error = e
  current_status(:error)
  @logger.error(e.message)
  e.backtrace { |b| @logger.error b }
ensure
  @jdbc_connection.close if @jdbc_connection
  @end_time = Time.new.utc

  if @transform_error.nil?
    current_status(:completed)
  else
    current_status(:error)
  end

  @logger.info("#{@transform_name} runtime #{@end_time - @start_time}s")
  @logger.info("#{@transform_name} exit code: #{@exit_code} status: #{@status}")
end

#valid_statusesArray

Valid transform statuses :initializing, :connecting_to_database, :running, :completed, :error

Returns:

  • (Array)

    array of valid transform statuses



29
30
31
# File 'lib/sneaql.rb', line 29

def valid_statuses
  [:initializing, :connecting_to_database, :running, :completed, :error, :validating, :validated]
end

#validateObject

validate the transform.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/sneaql.rb', line 83

def validate
  current_status(:validating)
  validate_parsed_steps(@parsed_steps)
rescue Sneaql::Exceptions::TransformIsLocked => e
  @transform_error = e
  @logger.info(e.message)
rescue Sneaql::Exceptions::SQLTestExitCondition => e
  @transform_error = nil
  @logger.info(e.message)
rescue => e
  @exit_code = 1
  @transform_error = e
  current_status(:error)
  @logger.error(e.message)
  e.backtrace { |b| @logger.error b }
ensure
  @end_time = Time.new.utc

  if @transform_error.nil?
    current_status(:validated)
  else
    current_status(:error)
  end

  @logger.info("#{@transform_name} validation time #{@end_time - @start_time}s")
  @logger.info("#{@transform_name} exit code: #{@exit_code} status: #{@status}")
end

#validate_parsed_steps(steps) ⇒ Object

Validates the arguments for all tags.

Parameters:



189
190
191
192
193
# File 'lib/sneaql.rb', line 189

def validate_parsed_steps(steps)
  steps.each do |s|
    raise Sneaql::Exceptions::StatementParsingError unless s[:parser].valid_arguments_in_all_statements?
  end
end