Class: WCC::Contentful::SyncEngine

Inherits:
Object
  • Object
show all
Includes:
Wisper::Publisher
Defined in:
lib/wcc/contentful/sync_engine.rb

Overview

The SyncEngine is used to keep the currently configured store up to date using the Sync API. It is available on the WCC::Contentful::Services instance, and the application is responsible to periodically call #next in order to hit the sync API and update the store.

If you have mounted the WCC::Contentful::Engine, AND the configured store is one that can be synced (i.e. it responds to ‘:index`), then the WCC::Contentful::WebhookController will call #next automatically anytime a webhook is received. Otherwise you should hook up to the Webhook events and call the sync engine via your initializer:

WCC::Contentful::Events.subscribe(proc do |event|
  WCC::Contentful::Services.instance.sync_engine.next(up_to: event.dig('sys', 'id'))
end, with: :call)

Defined Under Namespace

Classes: Job

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client: nil, store: nil, state: nil, **options) ⇒ SyncEngine

Returns a new instance of SyncEngine.

Raises:

  • (ArgumentError)


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/wcc/contentful/sync_engine.rb', line 40

def initialize(client: nil, store: nil, state: nil, **options)
  @options = {
    key: "sync:#{object_id}"
  }.merge!(options).freeze

  @state_key = @options[:key] || "sync:#{object_id}"
  @client = client || WCC::Contentful::Services.instance.client
  @mutex = Mutex.new

  if store
    unless %i[index index? find].all? { |m| store.respond_to?(m) }
      raise ArgumentError, ':store param must implement the Store interface'
    end

    @store = store
  end
  if state
    @state = token_wrapper_factory(state)
    raise ArgumentError, ':state param must be a String or Hash' unless @state.is_a? Hash
    raise ArgumentError, ':state param must be of sys.type = "token"' unless @state.dig('sys', 'type') == 'token'
  end
  raise ArgumentError, 'either :state or :store must be provided' unless @state || @store
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



34
35
36
# File 'lib/wcc/contentful/sync_engine.rb', line 34

def client
  @client
end

#optionsObject (readonly)

Returns the value of attribute options.



34
35
36
# File 'lib/wcc/contentful/sync_engine.rb', line 34

def options
  @options
end

#storeObject (readonly)

Returns the value of attribute store.



34
35
36
# File 'lib/wcc/contentful/sync_engine.rb', line 34

def store
  @store
end

Instance Method Details

#emit_event(event) ⇒ Object

Raises:

  • (ArgumentError)


104
105
106
107
108
109
# File 'lib/wcc/contentful/sync_engine.rb', line 104

def emit_event(event)
  type = event.dig('sys', 'type')
  raise ArgumentError, "Unknown event type #{event}" unless type.present?

  broadcast(type, event)
end

#emit_sync_complete(events) ⇒ Object



111
112
113
114
# File 'lib/wcc/contentful/sync_engine.rb', line 111

def emit_sync_complete(events)
  event = WCC::Contentful::Event::SyncComplete.new(events, source: self)
  broadcast('SyncComplete', event)
end

#next(up_to_id: nil) ⇒ Array

Gets the next increment of data from the Sync API. If the configured store responds to ‘:index`, that will be called with each item in the Sync response to update the store. If a block is passed, that block will be evaluated with each item in the response.

Parameters:

  • up_to_id (String) (defaults to: nil)

    An ID to look for in the response. The method returns true if the ID was found or no up_to_id was given, false if the ID did not come back.

Returns:

  • (Array)

    A ‘[Boolean, Integer]` tuple where the first value is whether the ID was found, and the second value is the number of items returned.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/wcc/contentful/sync_engine.rb', line 73

def next(up_to_id: nil)
  id_found = up_to_id.nil?
  all_events = []

  @mutex.synchronize do
    @state ||= read_state || token_wrapper_factory(nil)
    sync_token = @state['token']

    next_sync_token =
      client.sync(sync_token: sync_token) do |item|
        id = item.dig('sys', 'id')
        id_found ||= id == up_to_id

        store.index(item) if store&.index?
        event = WCC::Contentful::Event.from_raw(item, source: self)
        yield(event) if block_given?
        emit_event(event)

        # Only keep the "sys" not the content in case we have a large space
        all_events << WCC::Contentful::Event.from_raw(item.slice('sys'), source: self)
      end

    @state = @state.merge('token' => next_sync_token)
    write_state
  end

  emit_sync_complete(all_events)

  [id_found, all_events.length]
end

#should_sync?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/wcc/contentful/sync_engine.rb', line 36

def should_sync?
  store&.index?
end

#stateObject



30
31
32
# File 'lib/wcc/contentful/sync_engine.rb', line 30

def state
  (@state&.dup || token_wrapper_factory(nil)).freeze
end