@@ -71,23 +71,30 @@ WHERE
71
71
AND statements.CURRENT_SCHEMA NOT IN ('mysql', 'performance_schema', 'sys', 'information_schema')
72
72
%s %s`
73
73
74
+ const updateSetupConsumers = `
75
+ UPDATE performance_schema.setup_consumers
76
+ SET enabled = 'yes'
77
+ WHERE name = 'events_statements_cpu'`
78
+
74
79
type QuerySampleArguments struct {
75
- DB * sql.DB
76
- InstanceKey string
77
- CollectInterval time.Duration
78
- EntryHandler loki.EntryHandler
79
- DisableQueryRedaction bool
80
+ DB * sql.DB
81
+ InstanceKey string
82
+ CollectInterval time.Duration
83
+ EntryHandler loki.EntryHandler
84
+ DisableQueryRedaction bool
85
+ AutoEnableSetupConsumers bool
80
86
81
87
Logger log.Logger
82
88
}
83
89
84
90
type QuerySample struct {
85
- dbConnection * sql.DB
86
- instanceKey string
87
- collectInterval time.Duration
88
- entryHandler loki.EntryHandler
89
- sqlParser parser.Parser
90
- disableQueryRedaction bool
91
+ dbConnection * sql.DB
92
+ instanceKey string
93
+ collectInterval time.Duration
94
+ entryHandler loki.EntryHandler
95
+ sqlParser parser.Parser
96
+ disableQueryRedaction bool
97
+ autoEnableSetupConsumers bool
91
98
92
99
logger log.Logger
93
100
running * atomic.Bool
@@ -100,14 +107,15 @@ type QuerySample struct {
100
107
101
108
func NewQuerySample (args QuerySampleArguments ) (* QuerySample , error ) {
102
109
c := & QuerySample {
103
- dbConnection : args .DB ,
104
- instanceKey : args .InstanceKey ,
105
- collectInterval : args .CollectInterval ,
106
- entryHandler : args .EntryHandler ,
107
- sqlParser : parser .NewTiDBSqlParser (),
108
- disableQueryRedaction : args .DisableQueryRedaction ,
109
- logger : log .With (args .Logger , "collector" , QuerySampleName ),
110
- running : & atomic.Bool {},
110
+ dbConnection : args .DB ,
111
+ instanceKey : args .InstanceKey ,
112
+ collectInterval : args .CollectInterval ,
113
+ entryHandler : args .EntryHandler ,
114
+ sqlParser : parser .NewTiDBSqlParser (),
115
+ disableQueryRedaction : args .DisableQueryRedaction ,
116
+ autoEnableSetupConsumers : args .AutoEnableSetupConsumers ,
117
+ logger : log .With (args .Logger , "collector" , QuerySampleName ),
118
+ running : & atomic.Bool {},
111
119
}
112
120
113
121
return c , nil
@@ -185,6 +193,12 @@ func (c *QuerySample) initializeBookmark(ctx context.Context) error {
185
193
}
186
194
187
195
func (c * QuerySample ) fetchQuerySamples (ctx context.Context ) error {
196
+ if c .autoEnableSetupConsumers {
197
+ if err := c .checkSetupConsumersSettings (ctx ); err != nil {
198
+ return err
199
+ }
200
+ }
201
+
188
202
timeRow := c .dbConnection .QueryRowContext (ctx , selectNowAndUptime )
189
203
190
204
var now , uptime float64
@@ -382,6 +396,23 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
382
396
return nil
383
397
}
384
398
399
+ func (c * QuerySample ) checkSetupConsumersSettings (ctx context.Context ) error {
400
+ rs , err := c .dbConnection .ExecContext (ctx , updateSetupConsumers )
401
+ if err != nil {
402
+ level .Error (c .logger ).Log ("msg" , "failed to update performance_schema.setup_consumers" , "err" , err )
403
+ return err
404
+ }
405
+
406
+ rowsAffected , err := rs .RowsAffected ()
407
+ if err != nil {
408
+ level .Error (c .logger ).Log ("msg" , "failed to get rows affected from performance_schema.setup_consumers" , "err" , err )
409
+ return err
410
+ }
411
+ level .Debug (c .logger ).Log ("msg" , "updated performance_schema.setup_consumers" , "rows_affected" , rowsAffected )
412
+
413
+ return nil
414
+ }
415
+
385
416
func (c * QuerySample ) calculateWallTime (serverStartTime , timer float64 ) float64 {
386
417
// timer indicates event timing since server startup.
387
418
// The timer value is in picoseconds with a column type of bigint unsigned. This value can overflow after about ~213 days.
0 commit comments