Class: EventSourcery::Postgres::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/event_sourcery/postgres/tracker.rb

Overview

This will set up a persisted event id tracker for processors.

Instance Method Summary collapse

Constructor Details

#initialize(db_connection = EventSourcery::Postgres.config.projections_database, table_name: EventSourcery::Postgres.config.tracker_table_name, obtain_processor_lock: true) ⇒ Tracker

Returns a new instance of Tracker.



6
7
8
9
10
11
12
# File 'lib/event_sourcery/postgres/tracker.rb', line 6

def initialize(db_connection = EventSourcery::Postgres.config.projections_database,
               table_name: EventSourcery::Postgres.config.tracker_table_name,
               obtain_processor_lock: true)
  @db_connection = db_connection
  @table_name = table_name.to_sym
  @obtain_processor_lock = obtain_processor_lock
end

Instance Method Details

#last_processed_event_id(processor_name) ⇒ Int?

This will return the last processed event id for the given processor name.

Parameters:

  • processor_name

    the name of the processor you want to look up

Returns:

  • (Int, nil)

    the value of the last event_id processed



70
71
72
73
# File 'lib/event_sourcery/postgres/tracker.rb', line 70

def last_processed_event_id(processor_name)
  track_entry = table.where(name: processor_name.to_s).first
  track_entry[:last_processed_event_id] if track_entry
end

#processed_event(processor_name, event_id) ⇒ Object

This will updated the tracker table to the given event id value for the given processor name.

Parameters:

  • processor_name

    the name of the processor to update

  • event_id

    the event id number to update to



39
40
41
42
43
44
# File 'lib/event_sourcery/postgres/tracker.rb', line 39

def processed_event(processor_name, event_id)
  table.
    where(name: processor_name.to_s).
    update(last_processed_event_id: event_id)
  true
end

#processing_event(processor_name, event_id) ⇒ Object

This allows you to process an event and update the tracker table in a single transaction. Will yield the given block first then update the the tracker table to the give event id for the given processor name.

Parameters:

  • processor_name

    the name of the processor to update

  • event_id

    the event id number to update to



52
53
54
55
56
57
# File 'lib/event_sourcery/postgres/tracker.rb', line 52

def processing_event(processor_name, event_id)
  @db_connection.transaction do
    yield
    processed_event(processor_name, event_id)
  end
end

#reset_last_processed_event_id(processor_name) ⇒ Object

This will reset the tracker to the start (0) for the given processor name.

Parameters:

  • processor_name

    the name of the processor to reset to 0



62
63
64
# File 'lib/event_sourcery/postgres/tracker.rb', line 62

def reset_last_processed_event_id(processor_name)
  table.where(name: processor_name.to_s).update(last_processed_event_id: 0)
end

#setup(processor_name = nil) ⇒ Object

Set up the given processor. This will create the projector tracker table if it does not exist. If given a processor_name it will then attempt to get a lock on the db.

Parameters:

  • processor_name (defaults to: nil)

    the name of the processor



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/event_sourcery/postgres/tracker.rb', line 19

def setup(processor_name = nil)
  create_table_if_not_exists if EventSourcery::Postgres.config.auto_create_projector_tracker

  unless tracker_table_exists?
    raise UnableToLockProcessorError, 'Projector tracker table does not exist'
  end

  if processor_name
    create_track_entry_if_not_exists(processor_name)
    if @obtain_processor_lock
      obtain_global_lock_on_processor(processor_name)
    end
  end
end

#tracked_processorsArray

Will return an array of all known tracked processors.

Returns:

  • (Array)

    array of all known tracked processors



78
79
80
# File 'lib/event_sourcery/postgres/tracker.rb', line 78

def tracked_processors
  table.select_map(:name)
end