Skip to content

Add Telemetry collector structure with some collected resources #1497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
951a631
WIP
bjee19 Jan 19, 2024
55320c6
Add graph resource count but still WIP
bjee19 Jan 22, 2024
3b43d1f
Add small formatting
bjee19 Jan 22, 2024
d6181a6
Add review feedback
bjee19 Jan 23, 2024
f51d0ee
Add version+name and adjust graphGetter
bjee19 Jan 23, 2024
0e467db
Change k8sClient to client Reader
bjee19 Jan 24, 2024
f7a2b08
Add small feedback review
bjee19 Jan 24, 2024
266e713
Add more feedback from review
bjee19 Jan 24, 2024
5a4f0c7
Add feedback and cfg
bjee19 Jan 25, 2024
e400f5e
Change ReferencedServices and add generic counting function
bjee19 Jan 25, 2024
cfc0519
Add collector tests
bjee19 Jan 26, 2024
f4e16a7
Refactor style on collector tests
bjee19 Jan 26, 2024
4372871
Add small style changes and documentation
bjee19 Jan 26, 2024
7eec0df
Add error case for latest graph being nil
bjee19 Jan 26, 2024
a1e230f
Add small feedback for error message
bjee19 Jan 26, 2024
d92cd83
Add latest configuration and tests
bjee19 Jan 26, 2024
130ce47
Add nodes resource to RBAC
bjee19 Jan 29, 2024
8b11f1d
Add additional documentation on exported types and functions
bjee19 Jan 29, 2024
905f476
Add lock to eventHandlerImpl
bjee19 Jan 30, 2024
1e455e5
Add GetLatestGraph into change processor tests
bjee19 Jan 30, 2024
4062c72
Add expData to job test
bjee19 Jan 30, 2024
c9749a6
Add GetLatestConfiguration to handler tests
bjee19 Jan 31, 2024
73e390d
Revert ReferencedServices to no longer store services
bjee19 Jan 31, 2024
1991643
Add feedback for collector tests
bjee19 Feb 1, 2024
89eb5ce
Add count for ignored gateway classes and gateways
bjee19 Feb 1, 2024
7213348
Add HealthChecker and blocking on job until NGF is ready
bjee19 Feb 1, 2024
32be3b7
Add job tests for waiting on health checker
bjee19 Feb 1, 2024
c07cb17
Add review feedback
bjee19 Feb 2, 2024
57805ad
Refactor CreateTelemetryJobWorker and job tests
bjee19 Feb 2, 2024
4ed9948
Add FIXME and and small comment
bjee19 Feb 2, 2024
49da1d3
Update manifests
bjee19 Feb 2, 2024
63f3a76
Add back createTelemetryJob
bjee19 Feb 2, 2024
a091bde
Add job worker name change and test changes
bjee19 Feb 2, 2024
6caf023
Change job worker tests to be unit tests
bjee19 Feb 2, 2024
9a1f99b
Add generic ReadyChannel to cronjob
bjee19 Feb 3, 2024
8119c21
Refactor collector tests to add list calls helper function
bjee19 Feb 3, 2024
455512a
Remove health checker interface
bjee19 Feb 4, 2024
0025558
Add setLatestConfiguration
bjee19 Feb 4, 2024
36fae13
Add small fixes
bjee19 Feb 4, 2024
e3a71e9
Add check for context canceled
bjee19 Feb 5, 2024
9ca0236
Add small line fix
bjee19 Feb 5, 2024
de6e3bc
Add review feedback
bjee19 Feb 5, 2024
89c41d5
Move test case around in collector tests
bjee19 Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add generic ReadyChannel to cronjob
  • Loading branch information
bjee19 committed Feb 6, 2024
commit 9a1f99beabbbd6dc3214aea6daf64dd84380547e
9 changes: 9 additions & 0 deletions internal/framework/runnables/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type CronJobConfig struct {
// Worker is the function that will be run for every cronjob iteration.
Worker func(context.Context)
// ReadyCh represents if the cronjob is ready to start.
ReadyCh <-chan struct{}
// Logger is the logger.
Logger logr.Logger
// Period defines the period of the cronjob. The cronjob will run every Period.
Expand All @@ -37,6 +39,13 @@ func NewCronJob(cfg CronJobConfig) *CronJob {
// Start starts the cronjob.
// Implements controller-runtime manager.Runnable
func (j *CronJob) Start(ctx context.Context) error {
select {
case <-j.cfg.ReadyCh:
case <-ctx.Done():
j.cfg.Logger.Info("Context canceled, failed to start cronjob")
return ctx.Err()
}

j.cfg.Logger.Info("Starting cronjob")

sliding := true // This means the period with jitter will be calculated after each worker call.
Expand Down
53 changes: 50 additions & 3 deletions internal/framework/runnables/cronjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ import (

. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry/telemetryfakes"
)

func TestCronJob(t *testing.T) {
g := NewWithT(t)

healthCollector := &telemetryfakes.FakeHealthChecker{}

readyChannel := make(chan struct{})
healthCollector.GetReadyChReturns(readyChannel)

timeout := 10 * time.Second
var callCount int

Expand All @@ -22,9 +29,10 @@ func TestCronJob(t *testing.T) {
}

cfg := CronJobConfig{
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
ReadyCh: healthCollector.GetReadyCh(),
}
job := NewCronJob(cfg)

Expand All @@ -35,6 +43,7 @@ func TestCronJob(t *testing.T) {
errCh <- job.Start(ctx)
close(errCh)
}()
close(readyChannel)

minReports := 2 // ensure that the CronJob reports more than once: it doesn't exit after the first run

Expand All @@ -44,3 +53,41 @@ func TestCronJob(t *testing.T) {
g.Eventually(errCh).Should(Receive(BeNil()))
g.Eventually(errCh).Should(BeClosed())
}

func TestCronJob_ContextCanceled(t *testing.T) {
g := NewWithT(t)

healthCollector := &telemetryfakes.FakeHealthChecker{}

readyChannel := make(chan struct{})
healthCollector.GetReadyChReturns(readyChannel)

timeout := 10 * time.Second
var callCount int

valCh := make(chan int, 128)
worker := func(context.Context) {
callCount++
valCh <- callCount
}

cfg := CronJobConfig{
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
ReadyCh: healthCollector.GetReadyCh(),
}
job := NewCronJob(cfg)

ctx, cancel := context.WithTimeout(context.Background(), timeout)

errCh := make(chan error)
go func() {
errCh <- job.Start(ctx)
close(errCh)
}()
cancel()

g.Eventually(errCh).Should(Receive())
g.Eventually(errCh).Should(BeClosed())
}
6 changes: 3 additions & 3 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ var _ = Describe("eventHandler", func() {
It("should set the health checker status properly when there are changes", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}
readyChannel := handler.cfg.healthChecker.GetReadyIfClosedChannel()
readyChannel := handler.cfg.healthChecker.GetReadyCh()

fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})

Expand All @@ -446,7 +446,7 @@ var _ = Describe("eventHandler", func() {
It("should set the health checker status properly when there are no changes or errors", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}
readyChannel := handler.cfg.healthChecker.GetReadyIfClosedChannel()
readyChannel := handler.cfg.healthChecker.GetReadyCh()

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expand All @@ -461,7 +461,7 @@ var _ = Describe("eventHandler", func() {
It("should set the health checker status properly when there is an error", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}
readyChannel := handler.cfg.healthChecker.GetReadyIfClosedChannel()
readyChannel := handler.cfg.healthChecker.GetReadyCh()

fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error"))
Expand Down
18 changes: 9 additions & 9 deletions internal/mode/static/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// newHealthCheckerImpl creates a new healthCheckerImpl.
func newHealthCheckerImpl() *healthCheckerImpl {
return &healthCheckerImpl{
readyIfClosedChan: make(chan struct{}),
readyCh: make(chan struct{}),
}
}

Expand All @@ -18,10 +18,10 @@ type healthCheckerImpl struct {
// firstBatchError is set when the first batch fails to configure nginx
// and we don't want to set ourselves as ready on the next batch if nothing changes
firstBatchError error
// readyIfClosedChan is a channel that is initialized in NewHealthCheckerImpl and represents if the NGF Pod is ready.
readyIfClosedChan chan struct{}
lock sync.RWMutex
ready bool
// readyCh is a channel that is initialized in NewHealthCheckerImpl and represents if the NGF Pod is ready.
readyCh chan struct{}
lock sync.RWMutex
ready bool
}

// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
Expand All @@ -45,10 +45,10 @@ func (h *healthCheckerImpl) setAsReady() {

h.ready = true
h.firstBatchError = nil
close(h.readyIfClosedChan)
close(h.readyCh)
}

// GetReadyIfClosedChannel returns a read-only channel, which determines if the NGF Pod is ready.
func (h *healthCheckerImpl) GetReadyIfClosedChannel() <-chan struct{} {
return h.readyIfClosedChan
// GetReadyCh returns a read-only channel, which determines if the NGF Pod is ready.
func (h *healthCheckerImpl) GetReadyCh() <-chan struct{} {
return h.readyCh
}
3 changes: 2 additions & 1 deletion internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func createTelemetryJob(
logger := cfg.Logger.WithName("telemetryJob")
exporter := telemetry.NewLoggingExporter(cfg.Logger.WithName("telemetryExporter").V(1 /* debug */))

worker := telemetry.CreateTelemetryJobWorker(logger, exporter, dataCollector, hc)
worker := telemetry.CreateTelemetryJobWorker(logger, exporter, dataCollector)

// 10 min jitter is enough per telemetry destination recommendation
// For the default period of 24 hours, jitter will be 10min /(24*60)min = 0.0069
Expand All @@ -426,6 +426,7 @@ func createTelemetryJob(
Logger: logger,
Period: cfg.TelemetryReportPeriod,
JitterFactor: jitterFactor,
ReadyCh: hc.GetReadyCh(),
},
),
}
Expand Down
13 changes: 2 additions & 11 deletions internal/mode/static/telemetry/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@ type DataCollector interface {

// HealthChecker checks if the NGF Pod is ready.
type HealthChecker interface {
// GetReadyIfClosedChannel returns a channel which determines if the NGF Pod is ready.
GetReadyIfClosedChannel() <-chan struct{}
// GetReadyCh returns a channel which determines if the NGF Pod is ready.
GetReadyCh() <-chan struct{}
}

func CreateTelemetryJobWorker(
logger logr.Logger,
exporter Exporter,
dataCollector DataCollector,
healthChecker HealthChecker,
) func(ctx context.Context) {
return func(ctx context.Context) {
readyChannel := healthChecker.GetReadyIfClosedChannel()
select {
case <-readyChannel:
case <-ctx.Done():
logger.Info("Context canceled, failed to start telemetry job")
return
}

// Gather telemetry
logger.V(1).Info("Gathering telemetry data")

Expand Down
29 changes: 2 additions & 27 deletions internal/mode/static/telemetry/job_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func TestCreateTelemetryJobWorker(t *testing.T) {
healthCollector := &telemetryfakes.FakeHealthChecker{}

readyChannel := make(chan struct{})
healthCollector.GetReadyIfClosedChannelReturns(readyChannel)
healthCollector.GetReadyChReturns(readyChannel)

worker := telemetry.CreateTelemetryJobWorker(zap.New(), exporter, dataCollector, healthCollector)
worker := telemetry.CreateTelemetryJobWorker(zap.New(), exporter, dataCollector)

expData := telemetry.Data{
ProjectMetadata: telemetry.ProjectMetadata{Name: "NGF", Version: "1.1"},
Expand Down Expand Up @@ -51,28 +51,3 @@ func TestCreateTelemetryJobWorker(t *testing.T) {
g.Expect(data).To(Equal(expData))
cancel()
}

func TestCreateTelemetryJobWorker_ContextCanceled(t *testing.T) {
g := NewWithT(t)

exporter := &telemetryfakes.FakeExporter{}
dataCollector := &telemetryfakes.FakeDataCollector{}
healthCollector := &telemetryfakes.FakeHealthChecker{}

readyChannel := make(chan struct{})
healthCollector.GetReadyIfClosedChannelReturns(readyChannel)

worker := telemetry.CreateTelemetryJobWorker(zap.New(), exporter, dataCollector, healthCollector)

timeout := 10 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)

go func() {
worker(ctx)
}()

cancel()

g.Expect(dataCollector.CollectCallCount()).To(BeZero())
g.Expect(exporter.ExportCallCount()).To(BeZero())
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.