import { AccountSyncOperation } from '@Lib/Services/Sync/Account/Operation' import { ContentType } from '@standardnotes/common' import { extendArray, isNotUndefined, isNullOrUndefined, removeFromIndex, sleep, subtractFromArray, useBoolean, Uuids, } from '@standardnotes/utils' import { ItemManager } from '@Lib/Services/Items/ItemManager' import { OfflineSyncOperation } from '@Lib/Services/Sync/Offline/Operation' import { PayloadManager } from '../Payloads/PayloadManager' import { SNApiService } from '../Api/ApiService' import { SNHistoryManager } from '../History/HistoryManager' import { SNLog } from '@Lib/Log' import { SNSessionManager } from '../Session/SessionManager' import { DiskStorageService } from '../Storage/DiskStorageService' import { GetSortedPayloadsByPriority } from '@Lib/Services/Sync/Utils' import { SyncClientInterface } from './SyncClientInterface' import { SyncPromise } from './Types' import { SyncOpStatus } from '@Lib/Services/Sync/SyncOpStatus' import { ServerSyncResponse } from '@Lib/Services/Sync/Account/Response' import { ServerSyncResponseResolver } from '@Lib/Services/Sync/Account/ResponseResolver' import { SyncSignal, SyncStats } from '@Lib/Services/Sync/Signals' import { UuidString } from '../../Types/UuidString' import { PayloadSource, CreateDecryptedItemFromPayload, FilterDisallowedRemotePayloadsAndMap, DeltaOutOfSync, ImmutablePayloadCollection, CreatePayload, FullyFormedTransferPayload, isEncryptedPayload, isDecryptedPayload, EncryptedPayloadInterface, DecryptedPayloadInterface, ItemsKeyContent, FullyFormedPayloadInterface, DeletedPayloadInterface, DecryptedPayload, CreateEncryptedServerSyncPushPayload, ServerSyncPushContextualPayload, isDeletedItem, DeletedItemInterface, DecryptedItemInterface, CreatePayloadSplit, CreateDeletedServerSyncPushPayload, ItemsKeyInterface, CreateNonDecryptedPayloadSplit, DeltaOfflineSaved, FilteredServerItem, PayloadEmitSource, getIncrementedDirtyIndex, getCurrentDirtyIndex, ItemContent, } from '@standardnotes/models' import { AbstractService, SyncEvent, SyncSource, InternalEventHandlerInterface, InternalEventBusInterface, StorageKey, InternalEventInterface, IntegrityEvent, IntegrityEventPayload, SyncMode, SyncOptions, SyncQueueStrategy, SyncServiceInterface, DiagnosticInfo, EncryptionService, } from '@standardnotes/services' import { OfflineSyncResponse } from './Offline/Response' import { CreateDecryptionSplitWithKeyLookup, CreateEncryptionSplitWithKeyLookup, KeyedDecryptionSplit, SplitPayloadsByEncryptionType, } from '@standardnotes/encryption' import { CreatePayloadFromRawServerItem } from './Account/Utilities' import { ApplicationSyncOptions } from '@Lib/Application/Options/OptionalOptions' const DEFAULT_MAJOR_CHANGE_THRESHOLD = 15 const INVALID_SESSION_RESPONSE_STATUS = 401 /** * The sync service orchestrates with the model manager, api service, and storage service * to ensure consistent state between the three. When a change is made to an item, consumers * call the sync service's sync function to first persist pending changes to local storage. * Then, the items are uploaded to the server. The sync service handles server responses, * including mapping any retrieved items to application state via model manager mapping. * After each sync request, any changes made or retrieved are also persisted locally. * The sync service largely does not perform any task unless it is called upon. */ export class SNSyncService extends AbstractService implements SyncServiceInterface, InternalEventHandlerInterface, SyncClientInterface { private dirtyIndexAtLastPresyncSave?: number private lastSyncDate?: Date private outOfSync = false private opStatus: SyncOpStatus private resolveQueue: SyncPromise[] = [] private spawnQueue: SyncPromise[] = [] /* A DownloadFirst sync must always be the first sync completed */ public completedOnlineDownloadFirstSync = false private majorChangeThreshold = DEFAULT_MAJOR_CHANGE_THRESHOLD private clientLocked = false private databaseLoaded = false private syncToken?: string private cursorToken?: string private syncLock = false private _simulate_latency?: { latency: number; enabled: boolean } private dealloced = false public lastSyncInvokationPromise?: Promise public currentSyncRequestPromise?: Promise /** Content types appearing first are always mapped first */ private readonly localLoadPriorty = [ ContentType.ItemsKey, ContentType.UserPrefs, ContentType.Component, ContentType.Theme, ] constructor( private itemManager: ItemManager, private sessionManager: SNSessionManager, private protocolService: EncryptionService, private storageService: DiskStorageService, private payloadManager: PayloadManager, private apiService: SNApiService, private historyService: SNHistoryManager, private readonly options: ApplicationSyncOptions, protected override internalEventBus: InternalEventBusInterface, ) { super(internalEventBus) this.opStatus = this.initializeStatus() } /** * If the database has been newly created (because its new or was previously destroyed) * we want to reset any sync tokens we have. */ public async onNewDatabaseCreated(): Promise { if (await this.getLastSyncToken()) { await this.clearSyncPositionTokens() } } private get launchPriorityUuids() { return this.storageService.getValue(StorageKey.LaunchPriorityUuids) ?? [] } public setLaunchPriorityUuids(launchPriorityUuids: string[]) { this.storageService.setValue(StorageKey.LaunchPriorityUuids, launchPriorityUuids) } public override deinit(): void { this.dealloced = true ;(this.sessionManager as unknown) = undefined ;(this.itemManager as unknown) = undefined ;(this.protocolService as unknown) = undefined ;(this.payloadManager as unknown) = undefined ;(this.storageService as unknown) = undefined ;(this.apiService as unknown) = undefined this.opStatus.reset() ;(this.opStatus as unknown) = undefined this.resolveQueue.length = 0 this.spawnQueue.length = 0 super.deinit() } private initializeStatus() { return new SyncOpStatus(setInterval, (event) => { void this.notifyEvent(event) }) } public lockSyncing(): void { this.clientLocked = true } public unlockSyncing(): void { this.clientLocked = false } public isOutOfSync(): boolean { return this.outOfSync } public getLastSyncDate(): Date | undefined { return this.lastSyncDate } public getSyncStatus(): SyncOpStatus { return this.opStatus } /** * Called by application when sign in or registration occurs. */ public resetSyncState(): void { this.dirtyIndexAtLastPresyncSave = undefined this.lastSyncDate = undefined this.outOfSync = false } public isDatabaseLoaded(): boolean { return this.databaseLoaded } /** * Used in tandem with `loadDatabasePayloads` */ public async getDatabasePayloads(): Promise { return this.storageService.getAllRawPayloads().catch((error) => { void this.notifyEvent(SyncEvent.DatabaseReadError, error) throw error }) } private async processItemsKeysFirstDuringDatabaseLoad( itemsKeysPayloads: FullyFormedPayloadInterface[], ): Promise { const encryptedItemsKeysPayloads = itemsKeysPayloads.filter(isEncryptedPayload) const originallyDecryptedItemsKeysPayloads = itemsKeysPayloads.filter( isDecryptedPayload, ) as DecryptedPayloadInterface[] const itemsKeysSplit: KeyedDecryptionSplit = { usesRootKeyWithKeyLookup: { items: encryptedItemsKeysPayloads, }, } const newlyDecryptedItemsKeys = await this.protocolService.decryptSplit(itemsKeysSplit) await this.payloadManager.emitPayloads( [...originallyDecryptedItemsKeysPayloads, ...newlyDecryptedItemsKeys], PayloadEmitSource.LocalDatabaseLoaded, ) } /** * @param rawPayloads - use `getDatabasePayloads` to get these payloads. * They are fed as a parameter so that callers don't have to await the loading, but can * await getting the raw payloads from storage */ public async loadDatabasePayloads(rawPayloads: FullyFormedTransferPayload[]): Promise { if (this.databaseLoaded) { throw 'Attempting to initialize already initialized local database.' } if (rawPayloads.length === 0) { this.databaseLoaded = true this.opStatus.setDatabaseLoadStatus(0, 0, true) return } const unsortedPayloads = rawPayloads .map((rawPayload) => { try { return CreatePayload(rawPayload, PayloadSource.Constructor) } catch (e) { console.error('Creating payload fail+ed', e) return undefined } }) .filter(isNotUndefined) const { itemsKeyPayloads, contentTypePriorityPayloads, remainingPayloads } = GetSortedPayloadsByPriority( unsortedPayloads, this.localLoadPriorty, this.launchPriorityUuids, ) await this.processItemsKeysFirstDuringDatabaseLoad(itemsKeyPayloads) await this.processPayloadBatch(contentTypePriorityPayloads) /** * Map in batches to give interface a chance to update. Note that total decryption * time is constant regardless of batch size. Decrypting 3000 items all at once or in * batches will result in the same time spent. It's the emitting/painting/rendering * that requires batch size optimization. */ const payloadCount = remainingPayloads.length const batchSize = this.options.loadBatchSize const numBatches = Math.ceil(payloadCount / batchSize) for (let batchIndex = 0; batchIndex < numBatches; batchIndex++) { const currentPosition = batchIndex * batchSize const batch = remainingPayloads.slice(currentPosition, currentPosition + batchSize) await this.processPayloadBatch(batch, currentPosition, payloadCount) } this.databaseLoaded = true this.opStatus.setDatabaseLoadStatus(0, 0, true) } private async processPayloadBatch( batch: FullyFormedPayloadInterface[], currentPosition?: number, payloadCount?: number, ) { const encrypted: EncryptedPayloadInterface[] = [] const nonencrypted: (DecryptedPayloadInterface | DeletedPayloadInterface)[] = [] for (const payload of batch) { if (isEncryptedPayload(payload)) { encrypted.push(payload) } else { nonencrypted.push(payload) } } const split: KeyedDecryptionSplit = { usesItemsKeyWithKeyLookup: { items: encrypted, }, } const results = await this.protocolService.decryptSplit(split) await this.payloadManager.emitPayloads([...nonencrypted, ...results], PayloadEmitSource.LocalDatabaseLoaded) void this.notifyEvent(SyncEvent.LocalDataIncrementalLoad) if (currentPosition != undefined && payloadCount != undefined) { this.opStatus.setDatabaseLoadStatus(currentPosition, payloadCount, false) } await sleep(1, false) } private setLastSyncToken(token: string) { this.syncToken = token return this.storageService.setValue(StorageKey.LastSyncToken, token) } private async setPaginationToken(token: string) { this.cursorToken = token if (token) { return this.storageService.setValue(StorageKey.PaginationToken, token) } else { return this.storageService.removeValue(StorageKey.PaginationToken) } } private async getLastSyncToken(): Promise { if (!this.syncToken) { this.syncToken = (await this.storageService.getValue(StorageKey.LastSyncToken)) as string } return this.syncToken } private async getPaginationToken(): Promise { if (!this.cursorToken) { this.cursorToken = (await this.storageService.getValue(StorageKey.PaginationToken)) as string } return this.cursorToken } private async clearSyncPositionTokens() { this.syncToken = undefined this.cursorToken = undefined await this.storageService.removeValue(StorageKey.LastSyncToken) await this.storageService.removeValue(StorageKey.PaginationToken) } private itemsNeedingSync() { return this.itemManager.getDirtyItems() } public async markAllItemsAsNeedingSyncAndPersist(): Promise { this.log('Marking all items as needing sync') const items = this.itemManager.items const payloads = items.map((item) => { return new DecryptedPayload({ ...item.payload.ejected(), dirty: true, dirtyIndex: getIncrementedDirtyIndex(), }) }) await this.payloadManager.emitPayloads(payloads, PayloadEmitSource.LocalChanged) await this.persistPayloads(payloads) } /** * Return the payloads that need local persistence, before beginning a sync. * This way, if the application is closed before a sync request completes, * pending data will be saved to disk, and synced the next time the app opens. */ private popPayloadsNeedingPreSyncSave(from: (DecryptedPayloadInterface | DeletedPayloadInterface)[]) { const lastPreSyncSave = this.dirtyIndexAtLastPresyncSave if (lastPreSyncSave == undefined) { return from } const payloads = from.filter((candidate) => { return !candidate.dirtyIndex || candidate.dirtyIndex > lastPreSyncSave }) this.dirtyIndexAtLastPresyncSave = getCurrentDirtyIndex() return payloads } private queueStrategyResolveOnNext(): Promise { return new Promise((resolve, reject) => { this.resolveQueue.push({ resolve, reject }) }) } private queueStrategyForceSpawnNew(options: SyncOptions) { return new Promise((resolve, reject) => { this.spawnQueue.push({ resolve, reject, options }) }) } /** * For timing strategy SyncQueueStrategy.ForceSpawnNew, we will execute a whole sync request * and pop it from the queue. */ private popSpawnQueue() { if (this.spawnQueue.length === 0) { return null } const promise = this.spawnQueue[0] removeFromIndex(this.spawnQueue, 0) this.log('Syncing again from spawn queue') return this.sync({ queueStrategy: SyncQueueStrategy.ForceSpawnNew, source: SyncSource.SpawnQueue, ...promise.options, }) .then(() => { promise.resolve() }) .catch(() => { promise.reject() }) } private async payloadsByPreparingForServer( payloads: (DecryptedPayloadInterface | DeletedPayloadInterface)[], ): Promise { const payloadSplit = CreatePayloadSplit(payloads) const encryptionSplit = SplitPayloadsByEncryptionType(payloadSplit.decrypted) const keyLookupSplit = CreateEncryptionSplitWithKeyLookup(encryptionSplit) const encryptedResults = await this.protocolService.encryptSplit(keyLookupSplit) const contextPayloads = [ ...encryptedResults.map(CreateEncryptedServerSyncPushPayload), ...payloadSplit.deleted.map(CreateDeletedServerSyncPushPayload), ] return contextPayloads } public async downloadFirstSync(waitTimeOnFailureMs: number, otherSyncOptions?: Partial): Promise { const maxTries = 5 for (let i = 0; i < maxTries; i++) { await this.sync({ mode: SyncMode.DownloadFirst, queueStrategy: SyncQueueStrategy.ForceSpawnNew, source: SyncSource.External, ...otherSyncOptions, }).catch(console.error) if (this.completedOnlineDownloadFirstSync) { return } else { await sleep(waitTimeOnFailureMs) } } console.error(`Failed downloadFirstSync after ${maxTries} tries`) } public async awaitCurrentSyncs(): Promise { await this.lastSyncInvokationPromise await this.currentSyncRequestPromise } public async sync(options: Partial = {}): Promise { if (this.clientLocked) { this.log('Sync locked by client') return } const fullyResolvedOptions: SyncOptions = { source: SyncSource.External, ...options, } this.lastSyncInvokationPromise = this.performSync(fullyResolvedOptions) return this.lastSyncInvokationPromise } private async prepareForSync(options: SyncOptions) { const items = this.itemsNeedingSync() /** * Freeze the begin date immediately after getting items needing sync. This way an * item dirtied at any point after this date is marked as needing another sync */ const beginDate = new Date() const frozenDirtyIndex = getCurrentDirtyIndex() /** * Items that have never been synced and marked as deleted should not be * uploaded to server, and instead deleted directly after sync completion. */ const neverSyncedDeleted: DeletedItemInterface[] = items.filter((item) => { return item.neverSynced && isDeletedItem(item) }) as DeletedItemInterface[] subtractFromArray(items, neverSyncedDeleted) const decryptedPayloads = items.map((item) => { return item.payloadRepresentation() }) const payloadsNeedingSave = this.popPayloadsNeedingPreSyncSave(decryptedPayloads) await this.persistPayloads(payloadsNeedingSave) if (options.onPresyncSave) { options.onPresyncSave() } return { items, beginDate, frozenDirtyIndex, neverSyncedDeleted } } /** * Allows us to lock this function from triggering duplicate network requests. * There are two types of locking checks: * 1. syncLocked(): If a call to sync() call has begun preparing to be sent to the server. * but not yet completed all the code below before reaching that point. * (before reaching opStatus.setDidBegin). * 2. syncOpInProgress: If a sync() call is in flight to the server. */ private configureSyncLock() { const syncInProgress = this.opStatus.syncInProgress const databaseLoaded = this.databaseLoaded const canExecuteSync = !this.syncLock const shouldExecuteSync = canExecuteSync && databaseLoaded && !syncInProgress if (shouldExecuteSync) { this.syncLock = true } else { this.log( !canExecuteSync ? 'Another function call has begun preparing for sync.' : syncInProgress ? 'Attempting to sync while existing sync in progress.' : 'Attempting to sync before local database has loaded.', ) } const releaseLock = () => { this.syncLock = false } return { shouldExecuteSync, releaseLock } } private deferSyncRequest(options: SyncOptions) { const useStrategy = !isNullOrUndefined(options.queueStrategy) ? options.queueStrategy : SyncQueueStrategy.ResolveOnNext if (useStrategy === SyncQueueStrategy.ResolveOnNext) { return this.queueStrategyResolveOnNext() } else if (useStrategy === SyncQueueStrategy.ForceSpawnNew) { return this.queueStrategyForceSpawnNew({ mode: options.mode, checkIntegrity: options.checkIntegrity, source: options.source, }) } else { throw Error(`Unhandled timing strategy ${useStrategy}`) } } private async prepareForSyncExecution( items: (DecryptedItemInterface | DeletedItemInterface)[], inTimeResolveQueue: SyncPromise[], beginDate: Date, frozenDirtyIndex: number, ) { this.opStatus.setDidBegin() await this.notifyEvent(SyncEvent.SyncWillBegin) /** * Subtract from array as soon as we're sure they'll be called. * resolves are triggered at the end of this function call */ subtractFromArray(this.resolveQueue, inTimeResolveQueue) /** * lastSyncBegan must be set *after* any point we may have returned above. * Setting this value means the item was 100% sent to the server. */ if (items.length > 0) { return this.itemManager.setLastSyncBeganForItems(items, beginDate, frozenDirtyIndex) } else { return items } } /** * The InTime resolve queue refers to any sync requests that were made while we still * have not sent out the current request. So, anything in the InTime resolve queue * will have made it "in time" to piggyback on the current request. Anything that comes * after InTime will schedule a new sync request. */ private getPendingRequestsMadeInTimeToPiggyBackOnCurrentRequest() { return this.resolveQueue.slice() } private getOfflineSyncParameters( payloads: (DecryptedPayloadInterface | DeletedPayloadInterface)[], mode: SyncMode = SyncMode.Default, ): { uploadPayloads: (DecryptedPayloadInterface | DeletedPayloadInterface)[] } { const uploadPayloads: (DecryptedPayloadInterface | DeletedPayloadInterface)[] = mode === SyncMode.Default ? payloads : [] return { uploadPayloads } } private createOfflineSyncOperation( payloads: (DeletedPayloadInterface | DecryptedPayloadInterface)[], source: SyncSource, mode: SyncMode = SyncMode.Default, ) { this.log('Syncing offline user', 'source:', source, 'mode:', mode, 'payloads:', payloads) const operation = new OfflineSyncOperation(payloads, async (type, response) => { if (this.dealloced) { return } if (type === SyncSignal.Response && response) { await this.handleOfflineResponse(response) } }) return operation } private async getOnlineSyncParameters( payloads: (DecryptedPayloadInterface | DeletedPayloadInterface)[], mode: SyncMode = SyncMode.Default, ): Promise<{ uploadPayloads: ServerSyncPushContextualPayload[] syncMode: SyncMode }> { const useMode = !this.completedOnlineDownloadFirstSync ? SyncMode.DownloadFirst : mode if (useMode === SyncMode.Default && !this.completedOnlineDownloadFirstSync) { throw Error('Attempting to default mode sync without having completed initial.') } const uploadPayloads: ServerSyncPushContextualPayload[] = useMode === SyncMode.Default ? await this.payloadsByPreparingForServer(payloads) : [] return { uploadPayloads, syncMode: useMode } } private async createServerSyncOperation( payloads: ServerSyncPushContextualPayload[], checkIntegrity: boolean, source: SyncSource, mode: SyncMode = SyncMode.Default, ) { const syncToken = await this.getLastSyncToken() const paginationToken = await this.getPaginationToken() const operation = new AccountSyncOperation( payloads, async (type: SyncSignal, response?: ServerSyncResponse, stats?: SyncStats) => { switch (type) { case SyncSignal.Response: if (this.dealloced) { return } if (response?.hasError) { this.handleErrorServerResponse(response) } else if (response) { await this.handleSuccessServerResponse(operation, response) } break case SyncSignal.StatusChanged: if (stats) { this.opStatus.setUploadStatus(stats.completedUploadCount, stats.totalUploadCount) } break } }, syncToken, paginationToken, this.apiService, ) this.log( 'Syncing online user', 'source', SyncSource[source], 'operation id', operation.id, 'integrity check', checkIntegrity, 'mode', SyncMode[mode], 'syncToken', syncToken, 'cursorToken', paginationToken, 'payloads', payloads, ) return operation } private async createSyncOperation( payloads: (DecryptedPayloadInterface | DeletedPayloadInterface)[], online: boolean, options: SyncOptions, ): Promise<{ operation: AccountSyncOperation | OfflineSyncOperation; mode: SyncMode }> { if (online) { const { uploadPayloads, syncMode } = await this.getOnlineSyncParameters(payloads, options.mode) return { operation: await this.createServerSyncOperation( uploadPayloads, useBoolean(options.checkIntegrity, false), options.source, syncMode, ), mode: syncMode, } } else { const { uploadPayloads } = this.getOfflineSyncParameters(payloads, options.mode) return { operation: this.createOfflineSyncOperation(uploadPayloads, options.source, options.mode), mode: options.mode || SyncMode.Default, } } } private async performSync(options: SyncOptions): Promise { const { shouldExecuteSync, releaseLock } = this.configureSyncLock() const { items, beginDate, frozenDirtyIndex, neverSyncedDeleted } = await this.prepareForSync(options) const inTimeResolveQueue = this.getPendingRequestsMadeInTimeToPiggyBackOnCurrentRequest() if (!shouldExecuteSync) { return this.deferSyncRequest(options) } if (this.dealloced) { return } const latestItems = await this.prepareForSyncExecution(items, inTimeResolveQueue, beginDate, frozenDirtyIndex) const online = this.sessionManager.online() const { operation, mode: syncMode } = await this.createSyncOperation( latestItems.map((i) => i.payloadRepresentation()), online, options, ) const operationPromise = operation.run() this.currentSyncRequestPromise = operationPromise await operationPromise if (this.dealloced) { return } releaseLock() const { hasError } = await this.handleSyncOperationFinish(operation, options, neverSyncedDeleted, syncMode) if (hasError) { return } const didSyncAgain = await this.potentiallySyncAgainAfterSyncCompletion( syncMode, options, inTimeResolveQueue, online, ) if (didSyncAgain) { return } if (options.checkIntegrity) { await this.notifyEventSync(SyncEvent.SyncRequestsIntegrityCheck, { source: options.source as SyncSource, }) } await this.notifyEventSync(SyncEvent.SyncCompletedWithAllItemsUploadedAndDownloaded, { source: options.source, }) this.resolvePendingSyncRequestsThatMadeItInTimeOfCurrentRequest(inTimeResolveQueue) return undefined } private async handleOfflineResponse(response: OfflineSyncResponse) { this.log('Offline Sync Response', response) const masterCollection = this.payloadManager.getMasterCollection() const delta = new DeltaOfflineSaved(masterCollection, response.savedPayloads) const emit = delta.result() const payloadsToPersist = await this.payloadManager.emitDeltaEmit(emit) await this.persistPayloads(payloadsToPersist) this.opStatus.clearError() await this.notifyEvent(SyncEvent.SingleRoundTripSyncCompleted, response) } private handleErrorServerResponse(response: ServerSyncResponse) { this.log('Sync Error', response) if (response.status === INVALID_SESSION_RESPONSE_STATUS) { void this.notifyEvent(SyncEvent.InvalidSession) } this.opStatus?.setError(response.error) void this.notifyEvent(SyncEvent.SyncError, response) } private async handleSuccessServerResponse(operation: AccountSyncOperation, response: ServerSyncResponse) { if (this._simulate_latency) { await sleep(this._simulate_latency.latency) } this.opStatus.clearError() this.opStatus.setDownloadStatus(response.retrievedPayloads.length) const masterCollection = this.payloadManager.getMasterCollection() const historyMap = this.historyService.getHistoryMapCopy() const resolver = new ServerSyncResponseResolver( { retrievedPayloads: await this.processServerPayloads(response.retrievedPayloads, PayloadSource.RemoteRetrieved), savedPayloads: response.savedPayloads, uuidConflictPayloads: await this.processServerPayloads( response.uuidConflictPayloads, PayloadSource.RemoteRetrieved, ), dataConflictPayloads: await this.processServerPayloads( response.dataConflictPayloads, PayloadSource.RemoteRetrieved, ), rejectedPayloads: await this.processServerPayloads(response.rejectedPayloads, PayloadSource.RemoteRetrieved), }, masterCollection, operation.payloadsSavedOrSaving, historyMap, ) this.log( 'Online Sync Response', 'Operator ID', operation.id, response.rawResponse.data, 'Decrypted payloads', resolver['payloadSet'], ) const emits = resolver.result() for (const emit of emits) { const payloadsToPersist = await this.payloadManager.emitDeltaEmit(emit) await this.persistPayloads(payloadsToPersist) } await Promise.all([ this.setLastSyncToken(response.lastSyncToken as string), this.setPaginationToken(response.paginationToken as string), this.notifyEvent(SyncEvent.SingleRoundTripSyncCompleted, response), ]) } private async processServerPayloads( items: FilteredServerItem[], source: PayloadSource, ): Promise { const payloads = items.map((i) => CreatePayloadFromRawServerItem(i, source)) const { encrypted, deleted } = CreateNonDecryptedPayloadSplit(payloads) const results: FullyFormedPayloadInterface[] = [...deleted] const { rootKeyEncryption, itemsKeyEncryption } = SplitPayloadsByEncryptionType(encrypted) const { results: rootKeyDecryptionResults, map: processedItemsKeys } = await this.decryptServerItemsKeys( rootKeyEncryption || [], ) extendArray(results, rootKeyDecryptionResults) if (itemsKeyEncryption) { const decryptionResults = await this.decryptProcessedServerPayloads(itemsKeyEncryption, processedItemsKeys) extendArray(results, decryptionResults) } return results } private async decryptServerItemsKeys(payloads: EncryptedPayloadInterface[]) { const map: Record> = {} if (payloads.length === 0) { return { results: [], map, } } const rootKeySplit: KeyedDecryptionSplit = { usesRootKeyWithKeyLookup: { items: payloads, }, } const results = await this.protocolService.decryptSplit(rootKeySplit) results.forEach((result) => { if (isDecryptedPayload(result) && result.content_type === ContentType.ItemsKey) { map[result.uuid] = result } }) return { results, map, } } private async decryptProcessedServerPayloads( payloads: EncryptedPayloadInterface[], map: Record>, ): Promise<(EncryptedPayloadInterface | DecryptedPayloadInterface)[]> { return Promise.all( payloads.map(async (encrypted) => { const previouslyProcessedItemsKey: DecryptedPayloadInterface | undefined = map[encrypted.items_key_id as string] const itemsKey = previouslyProcessedItemsKey ? (CreateDecryptedItemFromPayload(previouslyProcessedItemsKey) as ItemsKeyInterface) : undefined const keyedSplit: KeyedDecryptionSplit = {} if (itemsKey) { keyedSplit.usesItemsKey = { items: [encrypted], key: itemsKey, } } else { keyedSplit.usesItemsKeyWithKeyLookup = { items: [encrypted], } } return this.protocolService.decryptSplitSingle(keyedSplit) }), ) } private async handleSyncOperationFinish( operation: AccountSyncOperation | OfflineSyncOperation, options: SyncOptions, neverSyncedDeleted: DeletedItemInterface[], syncMode: SyncMode, ) { this.opStatus.setDidEnd() if (this.opStatus.hasError()) { return { hasError: true } } this.opStatus.reset() this.lastSyncDate = new Date() if (operation instanceof AccountSyncOperation && operation.numberOfItemsInvolved >= this.majorChangeThreshold) { void this.notifyEvent(SyncEvent.MajorDataChange) } if (neverSyncedDeleted.length > 0) { await this.handleNeverSyncedDeleted(neverSyncedDeleted) } if (syncMode !== SyncMode.DownloadFirst) { await this.notifyEvent(SyncEvent.SyncCompletedWithAllItemsUploaded, { source: options.source, }) } return { hasError: false } } private async handleDownloadFirstCompletionAndSyncAgain(online: boolean, options: SyncOptions) { if (online) { this.completedOnlineDownloadFirstSync = true } await this.notifyEvent(SyncEvent.DownloadFirstSyncCompleted) await this.sync({ source: SyncSource.AfterDownloadFirst, checkIntegrity: true, awaitAll: options.awaitAll, }) } private async syncAgainByHandlingRequestsWaitingInResolveQueue(options: SyncOptions) { this.log('Syncing again from resolve queue') const promise = this.sync({ source: SyncSource.ResolveQueue, checkIntegrity: options.checkIntegrity, }) if (options.awaitAll) { await promise } } /** * As part of the just concluded sync operation, more items may have * been dirtied (like conflicts), and the caller may want to await the * full resolution of these items. */ private async syncAgainByHandlingNewDirtyItems(options: SyncOptions) { await this.sync({ source: SyncSource.MoreDirtyItems, checkIntegrity: options.checkIntegrity, awaitAll: options.awaitAll, }) } /** * For timing strategy SyncQueueStrategy.ResolveOnNext. * Execute any callbacks pulled before this sync request began. * Calling resolve on the callbacks should be the last thing we do in this function, * to simulate calling .sync as if it went through straight to the end without having * to be queued. */ private resolvePendingSyncRequestsThatMadeItInTimeOfCurrentRequest(inTimeResolveQueue: SyncPromise[]) { for (const callback of inTimeResolveQueue) { callback.resolve() } } private async potentiallySyncAgainAfterSyncCompletion( syncMode: SyncMode, options: SyncOptions, inTimeResolveQueue: SyncPromise[], online: boolean, ) { if (syncMode === SyncMode.DownloadFirst) { await this.handleDownloadFirstCompletionAndSyncAgain(online, options) this.resolvePendingSyncRequestsThatMadeItInTimeOfCurrentRequest(inTimeResolveQueue) return true } const didSpawnNewRequest = this.popSpawnQueue() const resolveQueueHasRequestsThatDidntMakeItInTime = this.resolveQueue.length > 0 if (!didSpawnNewRequest && resolveQueueHasRequestsThatDidntMakeItInTime) { await this.syncAgainByHandlingRequestsWaitingInResolveQueue(options) this.resolvePendingSyncRequestsThatMadeItInTimeOfCurrentRequest(inTimeResolveQueue) return true } const newItemsNeedingSync = this.itemsNeedingSync() if (newItemsNeedingSync.length > 0) { await this.syncAgainByHandlingNewDirtyItems(options) this.resolvePendingSyncRequestsThatMadeItInTimeOfCurrentRequest(inTimeResolveQueue) return true } return false } /** * Items that have never been synced and marked as deleted should be cleared * as dirty, mapped, then removed from storage. */ private async handleNeverSyncedDeleted(items: DeletedItemInterface[]) { const payloads = items.map((item) => { return item.payloadRepresentation({ dirty: false, }) }) await this.payloadManager.emitPayloads(payloads, PayloadEmitSource.LocalChanged) await this.persistPayloads(payloads) } public async persistPayloads(payloads: FullyFormedPayloadInterface[]) { if (payloads.length === 0 || this.dealloced) { return } return this.storageService.savePayloads(payloads).catch((error) => { void this.notifyEvent(SyncEvent.DatabaseWriteError, error) SNLog.error(error) }) } setInSync(isInSync: boolean): void { if (isInSync === !this.outOfSync) { return } if (isInSync) { this.outOfSync = false void this.notifyEvent(SyncEvent.ExitOutOfSync) } else { this.outOfSync = true void this.notifyEvent(SyncEvent.EnterOutOfSync) } } async handleEvent(event: InternalEventInterface): Promise { if (event.type === IntegrityEvent.IntegrityCheckCompleted) { await this.handleIntegrityCheckEventResponse(event.payload as IntegrityEventPayload) } } private async handleIntegrityCheckEventResponse(eventPayload: IntegrityEventPayload) { const rawPayloads = eventPayload.rawPayloads if (rawPayloads.length === 0) { this.setInSync(true) return } const receivedPayloads = FilterDisallowedRemotePayloadsAndMap(rawPayloads).map((rawPayload) => { return CreatePayloadFromRawServerItem(rawPayload, PayloadSource.RemoteRetrieved) }) const payloadSplit = CreateNonDecryptedPayloadSplit(receivedPayloads) const encryptionSplit = SplitPayloadsByEncryptionType(payloadSplit.encrypted) const keyedSplit = CreateDecryptionSplitWithKeyLookup(encryptionSplit) const decryptionResults = await this.protocolService.decryptSplit(keyedSplit) this.setInSync(false) await this.emitOutOfSyncRemotePayloads([...decryptionResults, ...payloadSplit.deleted]) const shouldCheckIntegrityAgainAfterSync = eventPayload.source !== SyncSource.ResolveOutOfSync await this.sync({ checkIntegrity: shouldCheckIntegrityAgainAfterSync, source: SyncSource.ResolveOutOfSync, }) } private async emitOutOfSyncRemotePayloads(payloads: FullyFormedPayloadInterface[]) { const delta = new DeltaOutOfSync( this.payloadManager.getMasterCollection(), ImmutablePayloadCollection.WithPayloads(payloads), this.historyService.getHistoryMapCopy(), ) const emit = delta.result() await this.payloadManager.emitDeltaEmit(emit) await this.persistPayloads(emit.emits) } override async getDiagnostics(): Promise { const dirtyUuids = Uuids(this.itemsNeedingSync()) return { sync: { syncToken: await this.getLastSyncToken(), cursorToken: await this.getPaginationToken(), dirtyIndexAtLastPresyncSave: this.dirtyIndexAtLastPresyncSave, lastSyncDate: this.lastSyncDate, outOfSync: this.outOfSync, completedOnlineDownloadFirstSync: this.completedOnlineDownloadFirstSync, clientLocked: this.clientLocked, databaseLoaded: this.databaseLoaded, syncLock: this.syncLock, dealloced: this.dealloced, itemsNeedingSync: dirtyUuids, itemsNeedingSyncCount: dirtyUuids.length, pendingRequestCount: this.resolveQueue.length + this.spawnQueue.length, }, } } /** @e2e_testing */ // eslint-disable-next-line camelcase ut_setDatabaseLoaded(loaded: boolean): void { this.databaseLoaded = loaded } /** @e2e_testing */ // eslint-disable-next-line camelcase ut_clearLastSyncDate(): void { this.lastSyncDate = undefined } /** @e2e_testing */ // eslint-disable-next-line camelcase ut_beginLatencySimulator(latency: number): void { this._simulate_latency = { latency: latency || 1000, enabled: true, } } /** @e2e_testing */ // eslint-disable-next-line camelcase ut_endLatencySimulator(): void { this._simulate_latency = undefined } }