Skip to content

fix panic when updating config concurrently #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6e3dc83
fix: fixing panic when updating config concurrently
bejjrajesh Mar 19, 2025
a097938
fix: adding nil check
bejjrajesh Mar 19, 2025
7d743bd
fix: adding nil check
bejjrajesh Mar 20, 2025
cfd35a2
fix: making toggle true once agent connected
bejjrajesh Apr 9, 2025
d455735
fix: adding debug log for metricsSender readyToSend Status
bejjrajesh Apr 9, 2025
c7dbfb5
fix: updating with main
bejjrajesh Apr 11, 2025
4b8fd54
fix: debugging
bejjrajesh Apr 11, 2025
0928fc0
fix: debugging
bejjrajesh Apr 11, 2025
779ef6c
fix: updating config reader features when on disk config changed
bejjrajesh Apr 11, 2025
fd17a92
fix: updating readyToSend when config changed
bejjrajesh Apr 12, 2025
dbee456
fix: updating readyToSend when config changed
bejjrajesh Apr 14, 2025
92f037b
fix: updating readyToSend when config changed
bejjrajesh Apr 15, 2025
f65680f
fix: adding conf to metrics sender
bejjrajesh Apr 16, 2025
5adfeac
fix: removing debug logs
bejjrajesh Apr 17, 2025
9c9f838
fix: fixing import issue
bejjrajesh Apr 17, 2025
a8dcf7b
Add nil pointer check to GenerateMetricsReportBundle (#1047)
dhurley Apr 16, 2025
fcb499e
Update net and nats dependencies (#1070)
dhurley May 1, 2025
1ec3554
fix: updating with main
bejjrajesh May 7, 2025
68eca9f
fix: running go deps
bejjrajesh May 7, 2025
468e6cc
fix: updating go.sum
bejjrajesh May 7, 2025
1dd35f0
fix: fixing test failures
bejjrajesh May 7, 2025
c299b17
fix: updating vendor folders
bejjrajesh May 7, 2025
2b35964
fix: updating go.sum
bejjrajesh May 7, 2025
16bb285
fix: fixing conflict
bejjrajesh May 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2686,6 +2686,7 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
Expand Down Expand Up @@ -2832,6 +2833,7 @@ github.com/google/go-replayers/grpcreplay v0.1.0/go.mod h1:8Ig2Idjpr6gifRd6pNVgg
github.com/google/go-replayers/httpreplay v0.1.0/go.mod h1:YKZViNhiGgqdBlUbI2MwGpq4pXxNmhJLPHQ7cv2b5no=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E

if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil {
corePlugins = append(corePlugins,
NewMetricsSender(reporter),
NewMetricsSender(reporter, loadedConfig),
)
}

Expand Down
3 changes: 3 additions & 0 deletions src/plugins/config_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig)
}

if synchronizeFeatures {
log.Debugf("agent config features changed, synchronizing features")
r.synchronizeFeatures(payloadAgentConfig)
r.config.Features = payloadAgentConfig.Details.Features
}

r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig))
Expand All @@ -164,6 +166,7 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
r.detailsMu.RLock()
for _, feature := range r.config.Features {
if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync {
log.Debugf("config_reader: deregistering the feature %s", feature)
r.deRegisterPlugin(feature)
}
}
Expand Down
47 changes: 27 additions & 20 deletions src/plugins/dataplane_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import (
)

type DataPlaneStatus struct {
messagePipeline core.MessagePipeInterface
ctx context.Context
sendStatus chan bool
healthTicker *time.Ticker
interval time.Duration
meta *proto.Metadata
binary core.NginxBinary
env core.Environment
version string
tags *[]string
configDirs string
lastSendDetails time.Time
envHostInfo *proto.HostInfo
reportInterval time.Duration
softwareDetails map[string]*proto.DataplaneSoftwareDetails
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
processes []*core.Process
messagePipeline core.MessagePipeInterface
ctx context.Context
sendStatus chan bool
healthTicker *time.Ticker
interval time.Duration
meta *proto.Metadata
binary core.NginxBinary
env core.Environment
version string
tags *[]string
configDirs string
lastSendDetails time.Time
envHostInfo *proto.HostInfo
reportInterval time.Duration
softwareDetails map[string]*proto.DataplaneSoftwareDetails
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
nginxConfigActivityStatusesMutex sync.RWMutex
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
processes []*core.Process
}

const (
Expand Down Expand Up @@ -81,7 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) {

func (dps *DataPlaneStatus) Close() {
log.Info("DataPlaneStatus is wrapping up")
dps.nginxConfigActivityStatusesMutex.Lock()
dps.nginxConfigActivityStatuses = nil
dps.nginxConfigActivityStatusesMutex.Unlock()
dps.softwareDetailsMutex.Lock()
dps.softwareDetails = nil
dps.softwareDetailsMutex.Unlock()
Expand Down Expand Up @@ -144,8 +147,10 @@ func (dps *DataPlaneStatus) Subscriptions() []string {

func (dps *DataPlaneStatus) updateNginxConfigActivityStatuses(newAgentActivityStatus *proto.AgentActivityStatus) {
log.Tracef("DataplaneStatus: Updating nginxConfigActivityStatuses with %v", newAgentActivityStatus)
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); ok {
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); dps.nginxConfigActivityStatuses != nil && ok {
dps.nginxConfigActivityStatusesMutex.Lock()
dps.nginxConfigActivityStatuses[newAgentActivityStatus.GetNginxConfigStatus().GetNginxId()] = newAgentActivityStatus
dps.nginxConfigActivityStatusesMutex.Unlock()
}
}

Expand Down Expand Up @@ -184,6 +189,8 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface)
func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus {
forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails)

dps.nginxConfigActivityStatusesMutex.Lock()
defer dps.nginxConfigActivityStatusesMutex.Unlock()
agentActivityStatuses := []*proto.AgentActivityStatus{}
for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses {
agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus)
Expand Down
8 changes: 4 additions & 4 deletions src/plugins/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (f *Features) Process(msg *core.Message) {

func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {

log.Debugf("features.go: enabling metrics feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
Expand All @@ -144,7 +144,7 @@ func (f *Features) enableMetricsFeature(_ string) []core.Plugin {

metrics := NewMetrics(f.conf, f.env, f.binary, f.processes)
metricsThrottle := NewMetricsThrottle(f.conf, f.env)
metricsSender := NewMetricsSender(f.commander)
metricsSender := NewMetricsSender(f.commander, conf)

return []core.Plugin{metrics, metricsThrottle, metricsSender}
}
Expand Down Expand Up @@ -188,14 +188,14 @@ func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) {

log.Debugf("features.go: enabling metrics_sender")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

metricsSender := NewMetricsSender(f.commander)
metricsSender := NewMetricsSender(f.commander, conf)

return []core.Plugin{metricsSender}
}
Expand Down
65 changes: 52 additions & 13 deletions src/plugins/metrics_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,37 @@ package plugins
import (
"context"
"strings"
"sync"

"github.com/nginx/agent/sdk/v2"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/sdk/v2/client"
"github.com/nginx/agent/sdk/v2/proto"
models "github.com/nginx/agent/sdk/v2/proto/events"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"

log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)

type MetricsSender struct {
reporter client.MetricReporter
pipeline core.MessagePipeInterface
ctx context.Context
started *atomic.Bool
readyToSend *atomic.Bool
reporter client.MetricReporter
pipeline core.MessagePipeInterface
ctx context.Context
started *atomic.Bool
readyToSend *atomic.Bool
readyToSendMu sync.RWMutex
conf *config.Config
}

func NewMetricsSender(reporter client.MetricReporter) *MetricsSender {
func NewMetricsSender(reporter client.MetricReporter, config *config.Config) *MetricsSender {
return &MetricsSender{
reporter: reporter,
started: atomic.NewBool(false),
readyToSend: atomic.NewBool(false),
conf: config,
}
}

Expand All @@ -50,8 +56,10 @@ func (r *MetricsSender) Init(pipeline core.MessagePipeInterface) {

func (r *MetricsSender) Close() {
log.Info("MetricsSender is wrapping up")
r.readyToSendMu.Lock()
r.started.Store(false)
r.readyToSend.Store(false)
defer r.readyToSendMu.Unlock()
}

func (r *MetricsSender) Info() *core.Info {
Expand All @@ -60,21 +68,29 @@ func (r *MetricsSender) Info() *core.Info {

func (r *MetricsSender) Process(msg *core.Message) {
if msg.Exact(core.AgentConnected) {
r.readyToSend.Toggle()
return
if r.conf.Features != nil && r.isFeatureEnabled(r.conf.Features) {
r.readyToSendMu.Lock()
r.readyToSend.Store(true)
r.readyToSendMu.Unlock()
} else {
r.readyToSendMu.Lock()
r.readyToSend.Store(false)
r.readyToSendMu.Unlock()
}
}

if msg.Exact(core.CommMetrics) {
payloads, ok := msg.Data().([]core.Payload)
if !ok {
log.Warnf("Failed to coerce Message to []Payload: %v", msg.Data())
return
}
defer r.readyToSendMu.RUnlock()
for _, p := range payloads {
r.readyToSendMu.RLock()
if !r.readyToSend.Load() {
log.Debugf("metrics_sender is not ready to send the metrics")
continue
}

switch report := p.(type) {
case *proto.MetricsReport:
message := client.MessageFromMetrics(report)
Expand All @@ -99,9 +115,9 @@ func (r *MetricsSender) Process(msg *core.Message) {
}
}
} else if msg.Exact(core.AgentConfigChanged) {
switch config := msg.Data().(type) {
switch agentConfig := msg.Data().(type) {
case *proto.AgentConfig:
r.metricSenderBackoff(config)
r.metricSenderBackoff(agentConfig)
default:
log.Warnf("metrics sender expected %T type, but got: %T", &proto.AgentConfig{}, msg.Data())
}
Expand All @@ -110,7 +126,17 @@ func (r *MetricsSender) Process(msg *core.Message) {

func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) {
log.Debugf("update metric reporter client configuration to %+v", agentConfig)

if agentConfig.Details.Features != nil {
if r.isFeatureEnabled(agentConfig.Details.Features) {
r.readyToSendMu.Lock()
r.readyToSend.Store(true)
r.readyToSendMu.Unlock()
} else {
r.readyToSendMu.Lock()
r.readyToSend.Store(false)
r.readyToSendMu.Unlock()
}
}
if agentConfig.GetDetails() == nil || agentConfig.GetDetails().GetServer() == nil || agentConfig.GetDetails().GetServer().GetBackoff() == nil {
log.Debug("not updating metric reporter client configuration as new Agent backoff settings is nil")
return
Expand All @@ -123,3 +149,16 @@ func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) {
func (r *MetricsSender) Subscriptions() []string {
return []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged}
}

func (r *MetricsSender) isFeatureEnabled(features []string) bool {
var isFeatureEnabled bool
if features != nil {
for _, feature := range features {
if feature == agent_config.FeatureMetricsSender {
isFeatureEnabled = true
break
}
}
}
return isFeatureEnabled
}
8 changes: 5 additions & 3 deletions src/plugins/metrics_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"testing"
"time"

"github.com/nginx/agent/v2/src/core/config"

"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
Expand Down Expand Up @@ -43,7 +45,7 @@ func TestMetricsSenderSendMetrics(t *testing.T) {
ctx := context.TODO()
mockMetricsReportClient := tutils.NewMockMetricsReportClient()
mockMetricsReportClient.Mock.On("Send", ctx, mock.Anything).Return(test.err)
pluginUnderTest := NewMetricsSender(mockMetricsReportClient)
pluginUnderTest := NewMetricsSender(mockMetricsReportClient, &config.Config{ClientID: "12345", Features: []string{"metrics-sender"}})

assert.False(t, pluginUnderTest.started.Load())
assert.False(t, pluginUnderTest.readyToSend.Load())
Expand Down Expand Up @@ -110,7 +112,7 @@ func TestMetricsSenderBackoff(t *testing.T) {
t.Run(test.name, func(_ *testing.T) {
ctx := context.TODO()
mockMetricsReportClient := tutils.NewMockMetricsReportClient()
pluginUnderTest := NewMetricsSender(mockMetricsReportClient)
pluginUnderTest := NewMetricsSender(mockMetricsReportClient, &config.Config{ClientID: "12345", Features: []string{"metrics-sender"}})

pluginUnderTest.Init(core.NewMockMessagePipe(ctx))
pluginUnderTest.Process(core.NewMessage(core.AgentConnected, nil))
Expand All @@ -130,6 +132,6 @@ func TestMetricsSenderBackoff(t *testing.T) {
}

func TestMetricsSenderSubscriptions(t *testing.T) {
pluginUnderTest := NewMetricsSender(tutils.NewMockMetricsReportClient())
pluginUnderTest := NewMetricsSender(tutils.NewMockMetricsReportClient(), &config.Config{ClientID: "12345"})
assert.Equal(t, []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged}, pluginUnderTest.Subscriptions())
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestNAPMonitoring(t *testing.T) {
return
}

metricsSender := plugins.NewMetricsSender(reporter)
metricsSender := plugins.NewMetricsSender(reporter, &config.Config{ClientID: "12345", Features: []string{"metrics-sender"}})

env := tutils.NewMockEnvironment()
env.On("NewHostInfo", testifyMock.Anything, testifyMock.Anything, testifyMock.Anything).Return(&sdkPb.HostInfo{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading