diff --git a/internal/component/database_observability/mysql/collector/explain_plan.go b/internal/component/database_observability/mysql/collector/explain_plan.go index df2edf6028..55dec1d3e8 100644 --- a/internal/component/database_observability/mysql/collector/explain_plan.go +++ b/internal/component/database_observability/mysql/collector/explain_plan.go @@ -607,7 +607,7 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error { continue } - if !strings.HasPrefix(strings.ToLower(qi.queryText), "select") { + if !strings.HasPrefix(strings.ToLower(qi.queryText), "select") && !strings.HasPrefix(strings.ToLower(qi.queryText), "with") { continue } diff --git a/internal/component/database_observability/mysql/collector/explain_plan_test.go b/internal/component/database_observability/mysql/collector/explain_plan_test.go index 2fd6fb27e7..a3f90ffa13 100644 --- a/internal/component/database_observability/mysql/collector/explain_plan_test.go +++ b/internal/component/database_observability/mysql/collector/explain_plan_test.go @@ -1,6 +1,7 @@ package collector import ( + "bytes" "fmt" "os" "testing" @@ -1506,9 +1507,12 @@ func TestExplainPlan(t *testing.T) { }) require.NoError(t, err) + err = mock.ExpectationsWereMet() + require.NoError(t, err) + t.Run("uses argument value on first request", func(t *testing.T) { nextSeen := lastSeen.Add(time.Second * 45) - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_text", @@ -1530,7 +1534,7 @@ func TestExplainPlan(t *testing.T) { }) t.Run("uses oldest last seen value on subsequent requests", func(t *testing.T) { - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_text", @@ -1544,5 +1548,172 @@ func TestExplainPlan(t *testing.T) { err := c.populateQueryCache(t.Context()) require.NoError(t, err) }) + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + }) + + t.Run("query validation", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectQuery(selectDBSchemaVersion).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{ + "version", + }).AddRow( + "8.0.32", + )) + + lastSeen := time.Now().Add(-time.Hour) + lokiClient := loki_fake.NewClient(func() {}) + defer lokiClient.Stop() + + logBuffer := bytes.NewBuffer(nil) + + c, err := NewExplainPlan(ExplainPlanArguments{ + DB: db, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(logBuffer)), + InstanceKey: "mysql-db", + ScrapeInterval: time.Second, + PerScrapeRatio: 1, + EntryHandler: lokiClient, + InitialLookback: lastSeen, + }) + require.NoError(t, err) + + t.Run("skips truncated queries", func(t *testing.T) { + logBuffer.Reset() + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "schema_name", + "digest", + "query_sample_text", + "last_seen", + }).AddRow( + "some_schema", + "some_digest", + "select * from some_table where ...", + lastSeen, + )) + + err = c.fetchExplainPlans(t.Context()) + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + require.Equal(t, 0, len(lokiEntries)) + + require.Contains(t, logBuffer.String(), "skipping truncated query") + require.NotContains(t, logBuffer.String(), "error") + }) + + t.Run("skips non-select queries", func(t *testing.T) { + lokiClient.Clear() + logBuffer.Reset() + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "schema_name", + "digest", + "query_sample_text", + "last_seen", + }).AddRow( + "some_schema", + "some_digest", + "update some_table set col = 1 where id = 1", + lastSeen, + ).AddRow( + "some_schema", + "some_digest", + "delete from some_table", + lastSeen, + ).AddRow( + "some_schema", + "some_digest", + "insert into some_table (col) values (1)", + lastSeen, + )) + + err = c.fetchExplainPlans(t.Context()) + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + require.Equal(t, 0, len(lokiEntries)) + + require.NotContains(t, logBuffer.String(), "error") + }) + + t.Run("passes queries beginning in select", func(t *testing.T) { + lokiClient.Clear() + logBuffer.Reset() + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "schema_name", + "digest", + "query_sample_text", + "last_seen", + }).AddRow( + "some_schema", + "some_digest", + "select * from some_table where id = 1", + lastSeen, + )) + + mock.ExpectExec("USE `some_schema`").WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0)) + + mock.ExpectQuery(selectExplainPlanPrefix + "select * from some_table where id = 1").WillReturnRows(sqlmock.NewRows([]string{ + "json", + }).AddRow( + []byte(`{"query_block": {"select_id": 1}}`), + )) + + err = c.fetchExplainPlans(t.Context()) + require.NoError(t, err) + + require.NotContains(t, logBuffer.String(), "error") + + require.Eventually( + t, + func() bool { return len(lokiClient.Received()) == 1 }, + 5*time.Second, + 10*time.Millisecond, + "did not receive the explain plan output log message within the timeout", + ) + }) + + t.Run("passes queries beginning in with", func(t *testing.T) { + lokiClient.Clear() + logBuffer.Reset() + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "schema_name", + "digest", + "query_sample_text", + "last_seen", + }).AddRow( + "some_schema", + "some_digest", + "with cte as (select * from some_table where id = 1) select * from cte", + lastSeen, + )) + + mock.ExpectExec("USE `some_schema`").WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0)) + + mock.ExpectQuery(selectExplainPlanPrefix + "with cte as (select * from some_table where id = 1) select * from cte").WillReturnRows(sqlmock.NewRows([]string{ + "json", + }).AddRow( + []byte(`{"query_block": {"select_id": 1}}`), + )) + + err = c.fetchExplainPlans(t.Context()) + require.NoError(t, err) + + require.NotContains(t, logBuffer.String(), "error") + + require.Eventually( + t, + func() bool { return len(lokiClient.Received()) == 1 }, + 5*time.Second, + 10*time.Millisecond, + "did not receive the explain plan output log message within the timeout", + ) + }) + + err = mock.ExpectationsWereMet() + require.NoError(t, err) }) }