Class: Fluent::PostgresqlQueryBasedFlydataInput

Inherits:
Input
  • Object
show all
Includes:
PostgresqlQueryBasedSyncPreference
Defined in:
lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb

Constant Summary collapse

SOURCE_POSITION_FILE_CLASS =
Flydata::SourcePostgresql::PluginSupport::SourcePositionFile

Instance Method Summary collapse

Methods included from PostgresqlQueryBasedSyncPreference

included

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb', line 22

def configure(conf)
  super

  Flydata::RollbarHookSetup.new($log).setup

  @dbconf = FlydataCore::Postgresql::Config.opts_for_pg(@data_entry['postgresql_data_entry_preference'])
  $log.info "postgresql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{@tables_append_only}\""

  opts = @dbconf.merge(pk_override:@data_entry['postgresql_data_entry_preference']['pk_override'])
  @table_meta = Flydata::SourcePostgresql::TableMeta.new(
    opts, @tables, @schema)

  @context = Flydata::SourcePostgresql::PluginSupport::Context.new(
    database: @database, tables: @tables,
    tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events,
    table_revs: @table_revs, dbconf: @dbconf,
    cur_src_pos_file: @source_position_file,
    cur_sent_pos_file: @sent_position_file,
    table_src_pos_files: @table_src_pos_files,
    table_meta: @table_meta,
    params: {
      fetch_interval: @fetch_interval,
      retry_interval: @retry_interval,
      emit_chunk_limit: @emit_chunk_limit,
    },
  )

  @client = Flydata::SourcePostgresql::QueryBasedSync::Client.new(@context, true)
end

#runObject



57
58
59
# File 'lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb', line 57

def run
  @client.start
end

#shutdownObject



61
62
63
64
65
66
67
# File 'lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb', line 61

def shutdown
  if @thread and @thread.alive?
    @client.stop_request
    @thread.join
  end
  super
end

#startObject



52
53
54
55
# File 'lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb', line 52

def start
 super
 @thread = Thread.new(&method(:run))
end