Skip to content

Commit 221ac06

Browse files
committed
database_observability: add auto-enable setup consumers to query_sample
Introduce a check in the `query_sample` collector to automatically enable the `events_statements_cpu` consumer in the `performance_schema.setup_consumers` table. This is guarded by two config options: - `allow_update_performance_schema_settings`: to enable "updates" to performance_schema settings altogether. - `auto_enable_setup_consumers`: to start auto-enabling of the setting specific to the `query_sample` collector.
1 parent da78243 commit 221ac06

File tree

6 files changed

+259
-36
lines changed

6 files changed

+259
-36
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ v1.10.0-rc.0
4848
- (_Experimental_) Additions to experimental `database_observability.mysql` component:
4949
- Add `explain_plan` collector to `database_observability.mysql` component. (@rgeyer)
5050
- `locks`: addition of data locks collector (@gaantunes @fridgepoet)
51-
- Query sample collector is now enabled by default (@matthewnolf)
51+
- `query_sample` collector is now enabled by default (@matthewnolf)
52+
- `query_sample` collector now supports auto-enabling the necessary `setup_consumers` settings (@cristiangreco)
5253

5354
- (_Experimental_) `prometheus.write.queue` add support for exemplars. (@dehaansa)
5455

@@ -108,8 +109,8 @@ v1.9.2
108109
### Other changes
109110

110111
- Add no-op blocks and attributes to the `prometheus.exporter.windows` component (@ptodev).
111-
Version 1.9.0 of Alloy removed the `msmq` block, as well as the `enable_v2_collector`,
112-
`where_clause`, and `use_api` attributes in the `service` block.
112+
Version 1.9.0 of Alloy removed the `msmq` block, as well as the `enable_v2_collector`,
113+
`where_clause`, and `use_api` attributes in the `service` block.
113114
This made it difficult for users to upgrade, so those attributes have now been made a no-op instead of being removed.
114115

115116
v1.9.1

docs/sources/reference/components/database_observability/database_observability.mysql.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ You can use the following arguments with `database_observability.mysql`:
3939
| `locks_collect_interval` | `duration` | How frequently to collect locks information from database. | `"30s"` | no |
4040
| `locks_threshold` | `duration` | Threshold for locks to be considered slow. If a lock exceeds this duration, it will be logged. | `"1s"` | no |
4141
| `setup_consumers_collect_interval` | `duration` | How frequently to collect performance_schema.setup_consumers information from the database. | `"1h"` | no |
42+
| `allow_update_performance_schema_settings` | `boolean` | Whether to allow updates to `performance_schema` settings. | `false` | no |
43+
| `query_sample_auto_enable_setup_consumers` | `boolean` | Whether to allow the `query_sample` collector to enable `setup_consumers` settings. | `false` | no |
4244

4345
The following collectors are configurable:
4446

internal/component/database_observability/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,27 @@ Use this statement to enable the consumer if it's disabled:
8888
UPDATE performance_schema.setup_consumers SET ENABLED = 'YES' WHERE NAME = 'events_statements_cpu';
8989
```
9090

91+
Note that the 'events_statements_cpu' consumer might be disabled again when the database is restarted. If you prefer Alloy to verify and enable the consumer on your behalf then extend the grants of the `db-o11y` user:
92+
93+
```sql
94+
GRANT UPDATE ON performance_schema.setup_consumers TO 'db-o11y'@'%';
95+
```
96+
97+
and additionally enable these options:
98+
99+
```
100+
database_observability.mysql "mysql_<your_DB_name>" {
101+
enable_collectors = ["query_sample"]
102+
103+
// Global option to allow writing to performance_schema tables
104+
allow_update_performance_schema_settings = true
105+
106+
// Option to allow the `query_sample` collector to
107+
// enable the 'events_statements_cpu' consumer
108+
query_sample_auto_enable_setup_consumers = true
109+
}
110+
```
111+
91112
7. Optionally enable the `events_waits_current` and `events_waits_history` consumers if you want to collect wait events for each query sample. Verify the current settings:
92113

93114
```promql

internal/component/database_observability/mysql/collector/query_sample.go

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,30 @@ WHERE
7070
AND statements.CURRENT_SCHEMA NOT IN ('mysql', 'performance_schema', 'sys', 'information_schema')
7171
%s %s`
7272

73+
const updateSetupConsumers = `
74+
UPDATE performance_schema.setup_consumers
75+
SET enabled = 'yes'
76+
WHERE name = 'events_statements_cpu'`
77+
7378
type QuerySampleArguments struct {
74-
DB *sql.DB
75-
InstanceKey string
76-
CollectInterval time.Duration
77-
EntryHandler loki.EntryHandler
78-
DisableQueryRedaction bool
79+
DB *sql.DB
80+
InstanceKey string
81+
CollectInterval time.Duration
82+
EntryHandler loki.EntryHandler
83+
DisableQueryRedaction bool
84+
AutoEnableSetupConsumers bool
7985

8086
Logger log.Logger
8187
}
8288

8389
type QuerySample struct {
84-
dbConnection *sql.DB
85-
instanceKey string
86-
collectInterval time.Duration
87-
entryHandler loki.EntryHandler
88-
sqlParser parser.Parser
89-
disableQueryRedaction bool
90+
dbConnection *sql.DB
91+
instanceKey string
92+
collectInterval time.Duration
93+
entryHandler loki.EntryHandler
94+
sqlParser parser.Parser
95+
disableQueryRedaction bool
96+
autoEnableSetupConsumers bool
9097

9198
logger log.Logger
9299
running *atomic.Bool
@@ -99,14 +106,15 @@ type QuerySample struct {
99106

100107
func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
101108
c := &QuerySample{
102-
dbConnection: args.DB,
103-
instanceKey: args.InstanceKey,
104-
collectInterval: args.CollectInterval,
105-
entryHandler: args.EntryHandler,
106-
sqlParser: parser.NewTiDBSqlParser(),
107-
disableQueryRedaction: args.DisableQueryRedaction,
108-
logger: log.With(args.Logger, "collector", QuerySampleName),
109-
running: &atomic.Bool{},
109+
dbConnection: args.DB,
110+
instanceKey: args.InstanceKey,
111+
collectInterval: args.CollectInterval,
112+
entryHandler: args.EntryHandler,
113+
sqlParser: parser.NewTiDBSqlParser(),
114+
disableQueryRedaction: args.DisableQueryRedaction,
115+
autoEnableSetupConsumers: args.AutoEnableSetupConsumers,
116+
logger: log.With(args.Logger, "collector", QuerySampleName),
117+
running: &atomic.Bool{},
110118
}
111119

112120
return c, nil
@@ -184,6 +192,12 @@ func (c *QuerySample) initializeBookmark(ctx context.Context) error {
184192
}
185193

186194
func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
195+
if c.autoEnableSetupConsumers {
196+
if err := c.updateSetupConsumersSettings(ctx); err != nil {
197+
return err
198+
}
199+
}
200+
187201
timeRow := c.dbConnection.QueryRowContext(ctx, selectNowAndUptime)
188202

189203
var now, uptime float64
@@ -381,6 +395,23 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
381395
return nil
382396
}
383397

398+
func (c *QuerySample) updateSetupConsumersSettings(ctx context.Context) error {
399+
rs, err := c.dbConnection.ExecContext(ctx, updateSetupConsumers)
400+
if err != nil {
401+
level.Error(c.logger).Log("msg", "failed to update performance_schema.setup_consumers", "err", err)
402+
return err
403+
}
404+
405+
rowsAffected, err := rs.RowsAffected()
406+
if err != nil {
407+
level.Error(c.logger).Log("msg", "failed to get rows affected from performance_schema.setup_consumers", "err", err)
408+
return err
409+
}
410+
level.Debug(c.logger).Log("msg", "updated performance_schema.setup_consumers", "rows_affected", rowsAffected)
411+
412+
return nil
413+
}
414+
384415
func (c *QuerySample) determineTimerClauseAndLimit(uptime float64) (string, float64) {
385416
timerClause := endOfTimeline
386417
currentOverflows := calculateNumberOfOverflows(uptime)

internal/component/database_observability/mysql/collector/query_sample_test.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2198,3 +2198,166 @@ func TestQuerySample_calculateTimerClauseAndLimit(t *testing.T) {
21982198
})
21992199
}
22002200
}
2201+
2202+
func TestQuerySample_AutoEnableSetupConsumers(t *testing.T) {
2203+
defer goleak.VerifyNone(t)
2204+
2205+
t.Run("executes updateSetupConsumers query when autoEnableSetupConsumers is true", func(t *testing.T) {
2206+
t.Parallel()
2207+
2208+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
2209+
require.NoError(t, err)
2210+
defer db.Close()
2211+
2212+
lokiClient := loki_fake.NewClient(func() {})
2213+
2214+
collector, err := NewQuerySample(QuerySampleArguments{
2215+
DB: db,
2216+
InstanceKey: "mysql-db",
2217+
CollectInterval: time.Second,
2218+
EntryHandler: lokiClient,
2219+
Logger: log.NewLogfmtLogger(os.Stderr),
2220+
AutoEnableSetupConsumers: true,
2221+
})
2222+
require.NoError(t, err)
2223+
require.NotNil(t, collector)
2224+
2225+
mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().
2226+
WillReturnRows(
2227+
sqlmock.NewRows([]string{
2228+
"uptime",
2229+
}).AddRow(
2230+
"1",
2231+
),
2232+
)
2233+
2234+
mock.ExpectExec(updateSetupConsumers).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 1))
2235+
2236+
mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(
2237+
sqlmock.NewRows([]string{
2238+
"now",
2239+
"uptime",
2240+
}).AddRow(
2241+
5,
2242+
1,
2243+
))
2244+
2245+
mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", digestTextNotNullClause, endOfTimeline)).WithArgs(
2246+
1e12,
2247+
1e12,
2248+
).RowsWillBeClosed().
2249+
WillReturnRows(
2250+
sqlmock.NewRows([]string{
2251+
"statements.CURRENT_SCHEMA",
2252+
"statements.THREAD_ID",
2253+
"statements.EVENT_ID",
2254+
"statements.END_EVENT_ID",
2255+
"statements.DIGEST",
2256+
"statements.DIGEST_TEXT",
2257+
"statements.TIMER_END",
2258+
"statements.TIMER_WAIT",
2259+
"statements.CPU_TIME",
2260+
"statements.ROWS_EXAMINED",
2261+
"statements.ROWS_SENT",
2262+
"statements.ROWS_AFFECTED",
2263+
"statements.ERRORS",
2264+
"statements.MAX_CONTROLLED_MEMORY",
2265+
"statements.MAX_TOTAL_MEMORY",
2266+
"waits.event_id",
2267+
"waits.end_event_id",
2268+
"waits.event_name",
2269+
"waits.object_name",
2270+
"waits.object_type",
2271+
"waits.timer_wait",
2272+
}).AddRow(
2273+
"some_schema",
2274+
"890",
2275+
"123",
2276+
"234",
2277+
"some_digest",
2278+
"select * from some_table where id = 1",
2279+
"70000000",
2280+
"20000000",
2281+
"10000000",
2282+
"5",
2283+
"5",
2284+
"0",
2285+
"0",
2286+
"456",
2287+
"457",
2288+
nil,
2289+
nil,
2290+
nil,
2291+
nil,
2292+
nil,
2293+
nil,
2294+
),
2295+
)
2296+
2297+
err = collector.Start(t.Context())
2298+
require.NoError(t, err)
2299+
2300+
require.Eventually(t, func() bool {
2301+
return len(lokiClient.Received()) == 1
2302+
}, 5*time.Second, 100*time.Millisecond)
2303+
2304+
collector.Stop()
2305+
lokiClient.Stop()
2306+
2307+
require.Eventually(t, func() bool {
2308+
return collector.Stopped()
2309+
}, 5*time.Second, 100*time.Millisecond)
2310+
2311+
err = mock.ExpectationsWereMet()
2312+
require.NoError(t, err)
2313+
})
2314+
2315+
t.Run("handles updateSetupConsumers query error gracefully", func(t *testing.T) {
2316+
t.Parallel()
2317+
2318+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
2319+
require.NoError(t, err)
2320+
defer db.Close()
2321+
2322+
lokiClient := loki_fake.NewClient(func() {})
2323+
2324+
collector, err := NewQuerySample(QuerySampleArguments{
2325+
DB: db,
2326+
InstanceKey: "mysql-db",
2327+
CollectInterval: time.Second,
2328+
EntryHandler: lokiClient,
2329+
Logger: log.NewLogfmtLogger(os.Stderr),
2330+
AutoEnableSetupConsumers: true,
2331+
})
2332+
require.NoError(t, err)
2333+
require.NotNil(t, collector)
2334+
2335+
mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().
2336+
WillReturnRows(
2337+
sqlmock.NewRows([]string{
2338+
"uptime",
2339+
}).AddRow(
2340+
"1",
2341+
),
2342+
)
2343+
2344+
mock.ExpectExec(updateSetupConsumers).WithoutArgs().WillReturnError(fmt.Errorf("setup consumers update failed"))
2345+
2346+
err = collector.Start(t.Context())
2347+
require.NoError(t, err)
2348+
2349+
require.Eventually(t, func() bool {
2350+
return !collector.Stopped()
2351+
}, 5*time.Second, 100*time.Millisecond)
2352+
2353+
collector.Stop()
2354+
lokiClient.Stop()
2355+
2356+
require.Eventually(t, func() bool {
2357+
return collector.Stopped()
2358+
}, 5*time.Second, 100*time.Millisecond)
2359+
2360+
err = mock.ExpectationsWereMet()
2361+
require.NoError(t, err)
2362+
})
2363+
}

internal/component/database_observability/mysql/component.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,12 @@ var (
5050
)
5151

5252
type Arguments struct {
53-
DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"`
54-
CollectInterval time.Duration `alloy:"collect_interval,attr,optional"`
55-
ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"`
56-
EnableCollectors []string `alloy:"enable_collectors,attr,optional"`
57-
DisableCollectors []string `alloy:"disable_collectors,attr,optional"`
53+
DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"`
54+
CollectInterval time.Duration `alloy:"collect_interval,attr,optional"`
55+
ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"`
56+
EnableCollectors []string `alloy:"enable_collectors,attr,optional"`
57+
DisableCollectors []string `alloy:"disable_collectors,attr,optional"`
58+
AllowUpdatePerfSchemaSettings bool `alloy:"allow_update_performance_schema_settings,attr,optional"`
5859

5960
// collector: 'setup_consumers'
6061
SetupConsumersCollectInterval time.Duration `alloy:"setup_consumers_collect_interval,attr,optional"`
@@ -69,11 +70,13 @@ type Arguments struct {
6970
LocksThreshold time.Duration `alloy:"locks_threshold,attr,optional"`
7071

7172
// collector: 'query_sample'
72-
DisableQueryRedaction bool `alloy:"disable_query_redaction,attr,optional"`
73+
DisableQueryRedaction bool `alloy:"disable_query_redaction,attr,optional"`
74+
AutoEnableSetupConsumers bool `alloy:"query_sample_auto_enable_setup_consumers,attr,optional"`
7375
}
7476

7577
var DefaultArguments = Arguments{
76-
CollectInterval: 1 * time.Minute,
78+
CollectInterval: 1 * time.Minute,
79+
AllowUpdatePerfSchemaSettings: false,
7780

7881
// collector: 'setup_consumers'
7982
SetupConsumersCollectInterval: 1 * time.Hour,
@@ -88,7 +91,8 @@ var DefaultArguments = Arguments{
8891
LocksThreshold: 1 * time.Second,
8992

9093
// collector: 'query_sample'
91-
DisableQueryRedaction: false,
94+
DisableQueryRedaction: false,
95+
AutoEnableSetupConsumers: false,
9296
}
9397

9498
func (a *Arguments) SetToDefault() {
@@ -322,12 +326,13 @@ func (c *Component) startCollectors() error {
322326

323327
if collectors[collector.QuerySampleName] {
324328
qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{
325-
DB: dbConnection,
326-
InstanceKey: c.instanceKey,
327-
CollectInterval: c.args.CollectInterval,
328-
EntryHandler: entryHandler,
329-
Logger: c.opts.Logger,
330-
DisableQueryRedaction: c.args.DisableQueryRedaction,
329+
DB: dbConnection,
330+
InstanceKey: c.instanceKey,
331+
CollectInterval: c.args.CollectInterval,
332+
EntryHandler: entryHandler,
333+
Logger: c.opts.Logger,
334+
DisableQueryRedaction: c.args.DisableQueryRedaction,
335+
AutoEnableSetupConsumers: c.args.AllowUpdatePerfSchemaSettings && c.args.AutoEnableSetupConsumers,
331336
})
332337
if err != nil {
333338
level.Error(c.opts.Logger).Log("msg", "failed to create QuerySample collector", "err", err)

0 commit comments

Comments
 (0)