Skip to content

Add internal spans to event repo #1678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions apps/webapp/app/v3/dynamicFlushScheduler.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { nanoid } from "nanoid";

export type DynamicFlushSchedulerConfig<T> = {
batchSize: number;
flushInterval: number;
callback: (batch: T[]) => Promise<void>;
callback: (flushId: string, batch: T[]) => Promise<void>;
};

export class DynamicFlushScheduler<T> {
Expand All @@ -10,7 +12,7 @@ export class DynamicFlushScheduler<T> {
private readonly BATCH_SIZE: number;
private readonly FLUSH_INTERVAL: number;
private flushTimer: NodeJS.Timeout | null;
private readonly callback: (batch: T[]) => Promise<void>;
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;

constructor(config: DynamicFlushSchedulerConfig<T>) {
this.batchQueue = [];
Expand Down Expand Up @@ -57,7 +59,7 @@ export class DynamicFlushScheduler<T> {

const batchToFlush = this.batchQueue.shift();
try {
await this.callback(batchToFlush!);
await this.callback(nanoid(), batchToFlush!);
if (this.batchQueue.length > 0) {
this.flushNextBatch();
}
Expand Down
158 changes: 101 additions & 57 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Attributes, Link, TraceFlags } from "@opentelemetry/api";
import { Attributes, Link, trace, TraceFlags, Tracer } from "@opentelemetry/api";
import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import {
Expand Down Expand Up @@ -32,6 +32,8 @@ import { singleton } from "~/utils/singleton";
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
import { startActiveSpan } from "./tracer.server";
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
import { startSpan } from "./tracing.server";
import { nanoid } from "nanoid";

const MAX_FLUSH_DEPTH = 5;

Expand Down Expand Up @@ -99,6 +101,7 @@ export type EventRepoConfig = {
batchInterval: number;
redis: RedisWithClusterOptions;
retentionInDays: number;
tracer?: Tracer;
};

export type QueryOptions = Prisma.TaskEventWhereInput;
Expand Down Expand Up @@ -202,6 +205,8 @@ export class EventRepository {
private _randomIdGenerator = new RandomIdGenerator();
private _redisPublishClient: RedisClient;
private _subscriberCount = 0;
private _tracer: Tracer;
private _lastFlushedAt: Date | undefined;

get subscriberCount() {
return this._subscriberCount;
Expand All @@ -219,22 +224,23 @@ export class EventRepository {
});

this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis);
this._tracer = _config.tracer ?? trace.getTracer("eventRepo", "0.0.1");
}

async insert(event: CreatableEvent) {
this._flushScheduler.addToBatch([event]);
}

async insertImmediate(event: CreatableEvent) {
await this.#flushBatch([event]);
await this.#flushBatch(nanoid(), [event]);
}

async insertMany(events: CreatableEvent[]) {
this._flushScheduler.addToBatch(events);
}

async insertManyImmediate(events: CreatableEvent[]) {
return await this.#flushBatch(events);
return await this.#flushBatch(nanoid(), events);
}

async completeEvent(spanId: string, options?: UpdateEventOptions) {
Expand Down Expand Up @@ -1019,42 +1025,56 @@ export class EventRepository {
};
}

async #flushBatch(batch: CreatableEvent[]) {
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
async #flushBatch(flushId: string, batch: CreatableEvent[]) {
return await startSpan(this._tracer, "flushBatch", async (span) => {
const events = excludePartialEventsWithCorrespondingFullEvent(batch);

const flushedEvents = await this.#doFlushBatch(events);
span.setAttribute("flush_id", flushId);
span.setAttribute("event_count", events.length);
span.setAttribute("partial_event_count", batch.length - events.length);
span.setAttribute(
"last_flush_in_ms",
this._lastFlushedAt ? new Date().getTime() - this._lastFlushedAt.getTime() : 0
);

if (flushedEvents.length !== events.length) {
logger.debug("[EventRepository][flushBatch] Failed to insert all events", {
attemptCount: events.length,
successCount: flushedEvents.length,
});
}
const flushedEvents = await this.#doFlushBatch(flushId, events);

this._lastFlushedAt = new Date();

if (flushedEvents.length !== events.length) {
logger.debug("[EventRepository][flushBatch] Failed to insert all events", {
attemptCount: events.length,
successCount: flushedEvents.length,
});

span.setAttribute("failed_event_count", events.length - flushedEvents.length);
}

this.#publishToRedis(flushedEvents);
this.#publishToRedis(flushedEvents);
});
}

async #doFlushBatch(events: CreatableEvent[], depth: number = 1): Promise<CreatableEvent[]> {
try {
await this.db.taskEvent.createMany({
data: events as Prisma.TaskEventCreateManyInput[],
});
async #doFlushBatch(
flushId: string,
events: CreatableEvent[],
depth: number = 1
): Promise<CreatableEvent[]> {
return await startSpan(this._tracer, "doFlushBatch", async (span) => {
try {
span.setAttribute("event_count", events.length);
span.setAttribute("depth", depth);
span.setAttribute("flush_id", flushId);

return events;
} catch (error) {
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
logger.error("Failed to insert events, most likely because of null characters", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
clientVersion: error.clientVersion,
},
await this.db.taskEvent.createMany({
data: events as Prisma.TaskEventCreateManyInput[],
});

if (events.length === 1) {
logger.debug("Attempting to insert event individually and it failed", {
event: events[0],
span.setAttribute("inserted_event_count", events.length);

return events;
} catch (error) {
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
logger.error("Failed to insert events, most likely because of null characters", {
error: {
name: error.name,
message: error.message,
Expand All @@ -1063,38 +1083,62 @@ export class EventRepository {
},
});

return [];
}
if (events.length === 1) {
logger.debug("Attempting to insert event individually and it failed", {
event: events[0],
error: {
name: error.name,
message: error.message,
stack: error.stack,
clientVersion: error.clientVersion,
},
});

if (depth > MAX_FLUSH_DEPTH) {
logger.error("Failed to insert events, reached maximum depth", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
clientVersion: error.clientVersion,
},
depth,
eventsCount: events.length,
});
span.setAttribute("failed_event_count", 1);

return [];
}
return [];
}

// Split the events into two batches, and recursively try to insert them.
const middle = Math.floor(events.length / 2);
const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)];
if (depth > MAX_FLUSH_DEPTH) {
logger.error("Failed to insert events, reached maximum depth", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
clientVersion: error.clientVersion,
},
depth,
eventsCount: events.length,
});

const [firstHalfEvents, secondHalfEvents] = await Promise.all([
this.#doFlushBatch(firstHalf, depth + 1),
this.#doFlushBatch(secondHalf, depth + 1),
]);
span.setAttribute("reached_max_flush_depth", true);
span.setAttribute("failed_event_count", events.length);

return firstHalfEvents.concat(secondHalfEvents);
}
return [];
}

throw error;
}
// Split the events into two batches, and recursively try to insert them.
const middle = Math.floor(events.length / 2);
const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)];

return await startSpan(this._tracer, "bisectBatch", async (span) => {
span.setAttribute("first_half_count", firstHalf.length);
span.setAttribute("second_half_count", secondHalf.length);
span.setAttribute("depth", depth);
span.setAttribute("flush_id", flushId);

const [firstHalfEvents, secondHalfEvents] = await Promise.all([
this.#doFlushBatch(flushId, firstHalf, depth + 1),
this.#doFlushBatch(flushId, secondHalf, depth + 1),
]);

return firstHalfEvents.concat(secondHalfEvents);
});
}

throw error;
}
});
}

async #publishToRedis(events: CreatableEvent[]) {
Expand Down
60 changes: 37 additions & 23 deletions apps/webapp/app/v3/otlpExporter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,72 @@ import {
CreatableEventEnvironmentType,
} from "./eventRepository.server";
import { logger } from "~/services/logger.server";
import { trace, Tracer } from "@opentelemetry/api";
import { startSpan } from "./tracing.server";

export type OTLPExporterConfig = {
batchSize: number;
batchInterval: number;
};

class OTLPExporter {
private _tracer: Tracer;

constructor(
private readonly _eventRepository: EventRepository,
private readonly _verbose: boolean
) {}
) {
this._tracer = trace.getTracer("otlp-exporter");
}

async exportTraces(
request: ExportTraceServiceRequest,
immediate: boolean = false
): Promise<ExportTraceServiceResponse> {
this.#logExportTracesVerbose(request);
return await startSpan(this._tracer, "exportTraces", async (span) => {
this.#logExportTracesVerbose(request);

const events = this.#filterResourceSpans(request.resourceSpans).flatMap((resourceSpan) => {
return convertSpansToCreateableEvents(resourceSpan);
});
const events = this.#filterResourceSpans(request.resourceSpans).flatMap((resourceSpan) => {
return convertSpansToCreateableEvents(resourceSpan);
});

this.#logEventsVerbose(events);
this.#logEventsVerbose(events);

if (immediate) {
await this._eventRepository.insertManyImmediate(events);
} else {
await this._eventRepository.insertMany(events);
}
span.setAttribute("event_count", events.length);

if (immediate) {
await this._eventRepository.insertManyImmediate(events);
} else {
await this._eventRepository.insertMany(events);
}

return ExportTraceServiceResponse.create();
return ExportTraceServiceResponse.create();
});
}

async exportLogs(
request: ExportLogsServiceRequest,
immediate: boolean = false
): Promise<ExportLogsServiceResponse> {
this.#logExportLogsVerbose(request);
return await startSpan(this._tracer, "exportLogs", async (span) => {
this.#logExportLogsVerbose(request);

const events = this.#filterResourceLogs(request.resourceLogs).flatMap((resourceLog) => {
return convertLogsToCreateableEvents(resourceLog);
});
const events = this.#filterResourceLogs(request.resourceLogs).flatMap((resourceLog) => {
return convertLogsToCreateableEvents(resourceLog);
});

this.#logEventsVerbose(events);
this.#logEventsVerbose(events);

if (immediate) {
await this._eventRepository.insertManyImmediate(events);
} else {
await this._eventRepository.insertMany(events);
}
span.setAttribute("event_count", events.length);

return ExportLogsServiceResponse.create();
if (immediate) {
await this._eventRepository.insertManyImmediate(events);
} else {
await this._eventRepository.insertMany(events);
}

return ExportLogsServiceResponse.create();
});
}

#logEventsVerbose(events: CreatableEvent[]) {
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ export class FinalizeTaskRunService extends BaseService {
const result = await resumeService.call({ id: run.id });

if (result.success) {
logger.log("FinalizeTaskRunService: Resumed dependent parents", { result, run });
logger.log("FinalizeTaskRunService: Resumed dependent parents", { result, run: run.id });
} else {
logger.error("FinalizeTaskRunService: Failed to resume dependent parents", { result, run });
logger.error("FinalizeTaskRunService: Failed to resume dependent parents", {
result,
run: run.id,
});
}

//enqueue alert
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/tracer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
SpanKind,
SpanOptions,
SpanStatusCode,
Tracer,
diag,
trace,
} from "@opentelemetry/api";
Expand Down