Class: Bricolage::StreamingLoad::TaskHandler

Inherits:
Bricolage::SQSDataSource::MessageHandler show all
Defined in:
lib/bricolage/streamingload/taskhandler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Bricolage::SQSDataSource::MessageHandler

#after_message_batch, #call_handler_method, #handle, #handleable?, #handler_method

Constructor Details

#initialize(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job) ⇒ TaskHandler

Returns a new instance of TaskHandler.



97
98
99
100
101
102
103
104
105
106
# File 'lib/bricolage/streamingload/taskhandler.rb', line 97

def initialize(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job)
  @ctx = context
  @ctl_ds = ctl_ds
  @data_ds = data_ds
  @log_table = log_table
  @task_queue = task_queue
  @working_dir = working_dir
  @logger = logger
  @job_class = job_class
end

Instance Attribute Details

#job_classObject



141
142
143
# File 'lib/bricolage/streamingload/taskhandler.rb', line 141

def job_class
  @job_class ||= Job
end

#loggerObject (readonly)

Returns the value of attribute logger.



108
109
110
# File 'lib/bricolage/streamingload/taskhandler.rb', line 108

def logger
  @logger
end

Class Method Details

.create_pid_file(path) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/bricolage/streamingload/taskhandler.rb', line 89

def TaskHandler.create_pid_file(path)
  File.open(path, 'w') {|f|
    f.puts $$
  }
rescue
  # ignore
end

.mainObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/bricolage/streamingload/taskhandler.rb', line 18

def TaskHandler.main
  opts = TaskHandlerOptions.new(ARGV)
  opts.parse
  unless opts.rest_arguments.size <= 1
    $stderr.puts opts.usage
    exit 1
  end
  config_path = opts.rest_arguments[0] || "#{opts.working_dir}/config/#{opts.environment}/streamingload.yml"
  config = YAML.load(File.read(config_path))

  logger = opts.log_file_path ? new_logger(opts.log_file_path, config) : nil
  ctx = Context.for_application(opts.working_dir, environment: opts.environment, logger: logger)

  ctl_ds = ctx.get_data_source('sql', config.fetch('ctl-postgres-ds', 'db_ctl'))
  data_ds = ctx.get_data_source('sql', config.fetch('redshift-ds', 'db_data'))
  task_queue = ctx.get_data_source('sqs', config.fetch('task-queue-ds', 'sqs_task'))
  log_table = config.fetch('log-table', 'strload_load_logs')
  service_logger =
    if config.key?('alert-level')
      new_alerting_logger(ctx, config)
    else
      ctx.logger
    end

  task_handler = new(
    context: ctx,
    ctl_ds: ctl_ds,
    data_ds: data_ds,
    log_table: log_table,
    task_queue: task_queue,
    working_dir: opts.working_dir,
    logger: service_logger,
    job_class: opts.noop? ? NoopJob : Job
  )

  if opts.task_id
    # Single task mode
    task_handler.execute_task_by_id opts.task_id, force: opts.force?
  else
    # Server mode
    Process.daemon(true) if opts.daemon?
    Dir.chdir '/'
    create_pid_file opts.pid_file_path if opts.pid_file_path
    begin
      service_logger.info "*** bricolage-streaming-loader started: pid=#{$$}"
      task_handler.event_loop
      service_logger.info "*** bricolage-streaming-loader shutdown gracefully: pid=#{$$}"
    rescue Exception => ex
      service_logger.exception(ex)
      service_logger.error "*** bricolage-streaming-loader abort: pid=#{$$}"
      raise
    end
  end
end

.new_alerting_logger(ctx, config) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/bricolage/streamingload/taskhandler.rb', line 81

def TaskHandler.new_alerting_logger(ctx, config)
  AlertingLogger.new(
    logger: ctx.logger,
    sns_datasource: ctx.get_data_source('sns', config.fetch('sns-ds', 'sns')),
    alert_level: config.fetch('alert-level', 'warn')
  )
end

.new_logger(path, config) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/bricolage/streamingload/taskhandler.rb', line 73

def TaskHandler.new_logger(path, config)
  Logger.new(
    device: path,
    rotation_period: config.fetch('log-rotation-period', 'daily'),
    rotation_size: config.fetch('log-rotation-size', nil)
  )
end

Instance Method Details

#event_loopObject



115
116
117
# File 'lib/bricolage/streamingload/taskhandler.rb', line 115

def event_loop
  @task_queue.handle_messages(handler: self, message_class: Task)
end

#execute_task_by_id(task_id, force: false) ⇒ Object



110
111
112
113
# File 'lib/bricolage/streamingload/taskhandler.rb', line 110

def execute_task_by_id(task_id, force: false)
  job = new_job(task_id, force)
  job.execute(fail_fast: true)
end

#handle_streaming_load_v3(t) ⇒ Object

message handler



126
127
128
129
130
131
132
133
134
135
# File 'lib/bricolage/streamingload/taskhandler.rb', line 126

def handle_streaming_load_v3(t)
  Dir.chdir(@working_dir) {
    job = new_job(t.task_id, t.force?)
    if job.execute
      @task_queue.delete_message(t)
    end
  }
rescue Exception => ex
  @logger.exception ex
end

#handle_unknown(t) ⇒ Object

message handler



120
121
122
123
# File 'lib/bricolage/streamingload/taskhandler.rb', line 120

def handle_unknown(t)
  @logger.warn "unknown task: #{t.message_body}"
  @task_queue.delete_message t
end

#new_job(task_id, force) ⇒ Object



137
138
139
# File 'lib/bricolage/streamingload/taskhandler.rb', line 137

def new_job(task_id, force)
  @job_class.new(context: @ctx, ctl_ds: @ctl_ds, data_ds: @data_ds, log_table: @log_table, task_id: task_id, force: force, logger: @logger)
end