Skip to content

Introduce max concurrency to DynamicFlushScheduler #1792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prev Previous commit
Next Next commit
add logging
  • Loading branch information
mpxr committed Mar 13, 2025
commit 4bb1e1da9dd1670392b90bbe54feeae16e0a66fe
50 changes: 45 additions & 5 deletions apps/webapp/app/v3/dynamicFlushScheduler.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ export class DynamicFlushScheduler<T> {
this.callback = config.callback;
this.isShuttingDown = false;
this.failedBatchCount = 0;

logger.info("Initializing DynamicFlushScheduler", {
batchSize: this.BATCH_SIZE,
flushInterval: this.FLUSH_INTERVAL,
maxConcurrency: this.MAX_CONCURRENCY,
});

this.startFlushTimer();
this.setupShutdownHandlers();

Expand Down Expand Up @@ -60,49 +67,64 @@ export class DynamicFlushScheduler<T> {
async addToBatch(items: T[]): Promise<void> {
this.currentBatch.push(...items);
logger.debug("Adding items to batch", {
batchSize: this.BATCH_SIZE,
newSize: this.currentBatch.length,
currentBatchSize: this.currentBatch.length,
itemsAdded: items.length,
});

if (this.currentBatch.length >= this.BATCH_SIZE) {
logger.debug("Batch size threshold reached, initiating flush", {
batchSize: this.BATCH_SIZE,
currentSize: this.currentBatch.length,
});
await this.flushNextBatch();
this.resetFlushTimer();
}
}

private startFlushTimer(): void {
this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL);
logger.debug("Started flush timer", { interval: this.FLUSH_INTERVAL });
}

private setupShutdownHandlers() {
process.on("SIGTERM", this.shutdown.bind(this));
process.on("SIGINT", this.shutdown.bind(this));
logger.debug("Shutdown handlers configured");
}

private async shutdown(): Promise<void> {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
logger.log("Shutting down dynamic flush scheduler...");
logger.info("Initiating shutdown of dynamic flush scheduler", {
remainingItems: this.currentBatch.length,
});

await this.checkAndFlush();
this.clearTimer();

logger.log("All items have been flushed.");
logger.info("Dynamic flush scheduler shutdown complete", {
totalFailedBatches: this.failedBatchCount,
});
}

private clearTimer(): void {
if (this.flushTimer) {
clearInterval(this.flushTimer);
logger.debug("Flush timer cleared");
}
}

private resetFlushTimer(): void {
this.clearTimer();
this.startFlushTimer();
logger.debug("Flush timer reset");
}

private async checkAndFlush(): Promise<void> {
if (this.currentBatch.length > 0) {
logger.debug("Periodic flush check triggered", {
currentBatchSize: this.currentBatch.length,
});
await this.flushNextBatch();
}
}
Expand All @@ -115,15 +137,26 @@ export class DynamicFlushScheduler<T> {
batches.push(this.currentBatch.splice(0, this.BATCH_SIZE));
}

logger.info("Starting batch flush", {
numberOfBatches: batches.length,
totalItems: batches.reduce((sum, batch) => sum + batch.length, 0),
});

const promises = batches.map((batch) =>
this.concurrencyLimiter(async () => {
const batchId = nanoid();
try {
logger.debug("Processing batch", {
batchId,
batchSize: batch.length,
});
await this.callback(batchId, batch!);
} catch (error) {
logger.error("Error inserting batch:", {
logger.error("Error processing batch", {
batchId,
error,
batchSize: batch.length,
errorMessage: error instanceof Error ? error.message : "Unknown error",
});
throw error;
}
Expand All @@ -134,5 +167,12 @@ export class DynamicFlushScheduler<T> {

const failedBatches = results.filter((result) => result.status === "rejected").length;
this.failedBatchCount += failedBatches;

logger.info("Batch flush complete", {
totalBatches: batches.length,
successfulBatches: batches.length - failedBatches,
failedBatches,
totalFailedBatches: this.failedBatchCount,
});
}
}