Skip to content

TEST / DEBUG PR: Submission save+sub stability / performance #4214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/app/submission/objects/submission-objects.actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ export class SaveSubmissionFormAction implements Action {
payload: {
submissionId: string;
isManual?: boolean;
timestamp: number;
};

/**
Expand All @@ -442,7 +443,8 @@ export class SaveSubmissionFormAction implements Action {
* the submission's ID
*/
constructor(submissionId: string, isManual: boolean = false) {
this.payload = { submissionId, isManual };
this.payload = { submissionId, isManual, timestamp: Date.now() };
console.log(`Creating SaveSubmissionFormAction for submission ${submissionId} at ${new Date().toISOString()}`);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/app/submission/sections/form/section-form.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ export class SubmissionSectionFormComponent extends SectionModelComponent {
* Initialize all subscriptions
*/
subscriptions(): void {
// clear existing subscriptions first
this.onSectionDestroy();
this.subs = [];
this.subs.push(
/**
* Subscribe to form's data
Expand All @@ -426,6 +429,7 @@ export class SubmissionSectionFormComponent extends SectionModelComponent {
this.updateForm(sectionState);
}),
);
console.log(`Section ${this.sectionData.id} has ${this.subs.length} active subscriptions`);
}

/**
Expand Down
185 changes: 169 additions & 16 deletions src/app/submission/submission.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import {
} from '@ngrx/store';
import { TranslateService } from '@ngx-translate/core';
import {
Observable,
of as observableOf,
Subscription,
EMPTY,
Observable, of,
of as observableOf, ReplaySubject, share, Subject,
Subscription, timeout, TimeoutError,
timer as observableTimer,
} from 'rxjs';
import {
catchError,
concatMap,
concatMap, debounceTime,
distinctUntilChanged,
filter,
filter, finalize,
find,
map,
startWith,
startWith, switchMap,
take,
tap,
} from 'rxjs/operators';
Expand Down Expand Up @@ -102,13 +103,20 @@ export class SubmissionService {
*/
protected autoSaveSub: Subscription;

private _currentSaveSubmissionId;
private _saveDebouncer = new Subject<{id: string, manual: boolean}>();
private activeStateSubscriptions: Map<string, Subscription[]> = new Map();

/**
* Observable used as timer
*/
protected timer$: Observable<any>;

private workspaceLinkPath = 'workspaceitems';
private workflowLinkPath = 'workflowitems';

// testing sub stuff
private submissionStateSubjects: Map<string, ReplaySubject<SubmissionObjectEntry>>;
/**
* Initialize service variables
* @param {NotificationsService} notificationsService
Expand All @@ -130,8 +138,48 @@ export class SubmissionService {
protected searchService: SearchService,
protected requestService: RequestService,
protected jsonPatchOperationService: SubmissionJsonPatchOperationsService) {
this._saveDebouncer.pipe(
debounceTime(300),
//distinctUntilChanged((prev, curr) => prev.id === curr.id),
).subscribe(({ id, manual }) => {
this._dispatchSaveImpl(id, manual);
});
}

// testing sub stuff
// Add this method to manage subscriptions better
private getOrCreateStateSubscription(submissionId: string): Observable<SubmissionObjectEntry> {
// Check if we already have an active subject for this submission
if (!this.submissionStateSubjects) {
this.submissionStateSubjects = new Map();
}

if (!this.submissionStateSubjects.has(submissionId)) {
// Create a new ReplaySubject to cache the latest state
const subject = new ReplaySubject<SubmissionObjectEntry>(1);

// Set up a single subscription to the store that feeds our subject
const subscription = this.store.select(submissionObjectFromIdSelector(submissionId))
.pipe(
filter((submission: SubmissionObjectEntry) => isNotUndefined(submission))
)
.subscribe(subject);

// Track this subscription so we can clean it up later
if (!this.activeStateSubscriptions.has(submissionId)) {
this.activeStateSubscriptions.set(submissionId, []);
}
this.activeStateSubscriptions.get(submissionId).push(subscription);

// Store our subject
this.submissionStateSubjects.set(submissionId, subject);
}

// Return the subject as an observable
return this.submissionStateSubjects.get(submissionId).asObservable();
}


/**
* Dispatch a new [ChangeSubmissionCollectionAction]
*
Expand Down Expand Up @@ -269,11 +317,60 @@ export class SubmissionService {
* whether is a manual save, default false
*/
dispatchSave(submissionId, manual?: boolean) {
this.getSubmissionSaveProcessingStatus(submissionId).pipe(
find((isPending: boolean) => !isPending),
).subscribe(() => {
this.store.dispatch(new SaveSubmissionFormAction(submissionId, manual));
});
console.log('qeueing save for', submissionId);
this._saveDebouncer.next({id: submissionId, manual: manual});
}
_dispatchSaveImpl(submissionId, manual?: boolean) {
console.log('Dispatching save for', submissionId);
const saveInProgress = this._currentSaveSubmissionId === submissionId;
if (saveInProgress) {
console.log(`Save already in progress, ignoring save for ${submissionId}`);
return;
}

this._currentSaveSubmissionId = submissionId;

// Check the current save processing status first
this.getSubmissionObject(submissionId).pipe(
take(1),
map((state: SubmissionObjectEntry) => state.savePending),
switchMap((isPending: boolean) => {
if (!isPending) {
// If not pending, proceed immediately
return of(false);
} else {
// Otherwise wait for non-pending state
return this.getSubmissionSaveProcessingStatus(submissionId).pipe(
find((pendingState: boolean) => !pendingState),
timeout(10000)
);
}
}),
finalize(() => {
console.log(`Clearing save in progress flag for ${submissionId}`);
this._currentSaveSubmissionId = null;
})
).subscribe({
next: () => {
console.log('Proceeding with save dispatch');
this.store.dispatch(new SaveSubmissionFormAction(submissionId, manual));
},
error: error => {
console.error('Error waiting for non-pending state', error);
// Error handling logic - potentially still dispatch the save if it was a timeout
if (error instanceof TimeoutError) {
console.log('Timeout occurred, forcing save dispatch');
this.store.dispatch(new SaveSubmissionFormAction(submissionId, manual));
}
}
})
// ).subscribe(() => {
// console.log('Proceeding with save dispatch');
// this.store.dispatch(new SaveSubmissionFormAction(submissionId, manual));
// }, error => {
// console.error('Error waiting for non-pending state', error);
// this._currentSaveSubmissionId = null;
// });
}

/**
Expand Down Expand Up @@ -319,9 +416,18 @@ export class SubmissionService {
* @return Observable<SubmissionObjectEntry>
* observable of SubmissionObjectEntry
*/
// getSubmissionObject(submissionId: string): Observable<SubmissionObjectEntry> {
// console.log(`Getting submission object for submission ${submissionId}, active subs:`, this.activeStateSubscriptions.get(submissionId)?.length || 0);
// return this.store.select(submissionObjectFromIdSelector(submissionId)).pipe(
// filter((submission: SubmissionObjectEntry) => isNotUndefined(submission)),
// share()
// );
// }
//
getSubmissionObject(submissionId: string): Observable<SubmissionObjectEntry> {
return this.store.select(submissionObjectFromIdSelector(submissionId)).pipe(
filter((submission: SubmissionObjectEntry) => isNotUndefined(submission)));
console.log(`Getting submission object for submission ${submissionId}, active subs:`,
this.activeStateSubscriptions.get(submissionId)?.length || 0);
return this.getOrCreateStateSubscription(submissionId);
}

/**
Expand Down Expand Up @@ -464,13 +570,26 @@ export class SubmissionService {
* @return Observable<boolean>
* observable with submission save processing status
*/
// getSubmissionSaveProcessingStatus(submissionId: string): Observable<boolean> {
// const requestId = Date.now(); // unique req id
// console.log(`${requestId}: checking save status for ${submissionId}`);
// return this.getSubmissionObject(submissionId).pipe(
// tap((submission) => {
// console.log('Current submission state:', submission.savePending ? 'PENDING' : 'NOT PENDING');
// }),
// map((state: SubmissionObjectEntry) => state.savePending),
// distinctUntilChanged(),
// tap(isPending => console.log(`Save pending status changed to: ${isPending ? 'PENDING' : 'NOT PENDING'}`)),
// startWith(false));
// }
getSubmissionSaveProcessingStatus(submissionId: string): Observable<boolean> {
return this.getSubmissionObject(submissionId).pipe(
map((state: SubmissionObjectEntry) => state.savePending),
distinctUntilChanged(),
startWith(false));
// Only log when the status actually changes
tap(isPending => console.log(`Save pending status: ${isPending ? 'PENDING' : 'NOT PENDING'}`))
);
}

/**
* Return the deposit processing status of the submission
*
Expand Down Expand Up @@ -568,13 +687,47 @@ export class SubmissionService {
).subscribe();
}




// clearSubmissionSubscriptions(submissionId: string) {
// const subs = this.activeStateSubscriptions.get(submissionId) || [];
// console.log(`Clearing ${subs.length} subscriptions for submission ${submissionId}`);
//
// subs.forEach(sub => {
// if (!sub.closed) {
// sub.unsubscribe();
// }
// });
//
// this.activeStateSubscriptions.set(submissionId, []);
// }
clearSubmissionSubscriptions(submissionId: string) {
const subs = this.activeStateSubscriptions.get(submissionId) || [];
console.log(`Clearing ${subs.length} subscriptions for submission ${submissionId}`);

subs.forEach(sub => {
if (!sub.closed) {
sub.unsubscribe();
}
});

this.activeStateSubscriptions.set(submissionId, []);

// Also clear any state subjects
if (this.submissionStateSubjects && this.submissionStateSubjects.has(submissionId)) {
this.submissionStateSubjects.delete(submissionId);
}
}
/**
* Dispatch a new [CancelSubmissionFormAction]
*/
resetAllSubmissionObjects() {
Array.from(this.activeStateSubscriptions.keys()).forEach(submissionId => {
this.clearSubmissionSubscriptions(submissionId);
});
this.store.dispatch(new CancelSubmissionFormAction());
}

/**
* Dispatch a new [ResetSubmissionFormAction]
*
Expand Down
Loading