Class: WCC::Contentful::SyncEngine
- Inherits:
-
Object
- Object
- WCC::Contentful::SyncEngine
- Includes:
- Wisper::Publisher
- Defined in:
- lib/wcc/contentful/sync_engine.rb
Overview
The SyncEngine is used to keep the currently configured store up to date using the Sync API. It is available on the WCC::Contentful::Services instance, and the application is responsible to periodically call #next in order to hit the sync API and update the store.
If you have mounted the WCC::Contentful::Engine, AND the configured store is one that can be synced (i.e. it responds to ‘:index`), then the WCC::Contentful::WebhookController will call #next automatically anytime a webhook is received. Otherwise you should hook up to the Webhook events and call the sync engine via your initializer:
WCC::Contentful::Events.subscribe(proc do |event|
WCC::Contentful::Services.instance.sync_engine.next(up_to: event.dig('sys', 'id'))
end, with: :call)
Defined Under Namespace
Classes: Job
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
Instance Method Summary collapse
- #emit_event(event) ⇒ Object
- #emit_sync_complete(events) ⇒ Object
-
#initialize(client: nil, store: nil, state: nil, **options) ⇒ SyncEngine
constructor
A new instance of SyncEngine.
-
#next(up_to_id: nil) ⇒ Array
Gets the next increment of data from the Sync API.
- #should_sync? ⇒ Boolean
- #state ⇒ Object
Constructor Details
#initialize(client: nil, store: nil, state: nil, **options) ⇒ SyncEngine
Returns a new instance of SyncEngine.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/wcc/contentful/sync_engine.rb', line 40 def initialize(client: nil, store: nil, state: nil, **) @options = { key: "sync:#{object_id}" }.merge!().freeze @state_key = @options[:key] || "sync:#{object_id}" @client = client || WCC::Contentful::Services.instance.client @mutex = Mutex.new if store unless %i[index index? find].all? { |m| store.respond_to?(m) } raise ArgumentError, ':store param must implement the Store interface' end @store = store end if state @state = token_wrapper_factory(state) raise ArgumentError, ':state param must be a String or Hash' unless @state.is_a? Hash raise ArgumentError, ':state param must be of sys.type = "token"' unless @state.dig('sys', 'type') == 'token' end raise ArgumentError, 'either :state or :store must be provided' unless @state || @store end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
34 35 36 |
# File 'lib/wcc/contentful/sync_engine.rb', line 34 def client @client end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
34 35 36 |
# File 'lib/wcc/contentful/sync_engine.rb', line 34 def @options end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
34 35 36 |
# File 'lib/wcc/contentful/sync_engine.rb', line 34 def store @store end |
Instance Method Details
#emit_event(event) ⇒ Object
104 105 106 107 108 109 |
# File 'lib/wcc/contentful/sync_engine.rb', line 104 def emit_event(event) type = event.dig('sys', 'type') raise ArgumentError, "Unknown event type #{event}" unless type.present? broadcast(type, event) end |
#emit_sync_complete(events) ⇒ Object
111 112 113 114 |
# File 'lib/wcc/contentful/sync_engine.rb', line 111 def emit_sync_complete(events) event = WCC::Contentful::Event::SyncComplete.new(events, source: self) broadcast('SyncComplete', event) end |
#next(up_to_id: nil) ⇒ Array
Gets the next increment of data from the Sync API. If the configured store responds to ‘:index`, that will be called with each item in the Sync response to update the store. If a block is passed, that block will be evaluated with each item in the response.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/wcc/contentful/sync_engine.rb', line 73 def next(up_to_id: nil) id_found = up_to_id.nil? all_events = [] @mutex.synchronize do @state ||= read_state || token_wrapper_factory(nil) sync_token = @state['token'] next_sync_token = client.sync(sync_token: sync_token) do |item| id = item.dig('sys', 'id') id_found ||= id == up_to_id store.index(item) if store&.index? event = WCC::Contentful::Event.from_raw(item, source: self) yield(event) if block_given? emit_event(event) # Only keep the "sys" not the content in case we have a large space all_events << WCC::Contentful::Event.from_raw(item.slice('sys'), source: self) end @state = @state.merge('token' => next_sync_token) write_state end emit_sync_complete(all_events) [id_found, all_events.length] end |
#should_sync? ⇒ Boolean
36 37 38 |
# File 'lib/wcc/contentful/sync_engine.rb', line 36 def should_sync? store&.index? end |
#state ⇒ Object
30 31 32 |
# File 'lib/wcc/contentful/sync_engine.rb', line 30 def state (@state&.dup || token_wrapper_factory(nil)).freeze end |