From d434530b48bd0eb024e8efd48bcde72cb36a36e3 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Wed, 4 Feb 2026 14:31:06 +0000 Subject: [PATCH 1/5] fix: make Close() idempotent and add Close tests - Add sync.Once to podEventLogger to ensure Close() is safe to call multiple times - Add TestCloseIdempotent to verify double-close doesn't panic - Add TestCloseDuringProcessing to verify close during active processing --- logger.go | 12 ++++++-- logger_test.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/logger.go b/logger.go index 0f53ebc..0c712a8 100644 --- a/logger.go +++ b/logger.go @@ -119,6 +119,9 @@ type podEventLogger struct { // hasSyncedFuncs tracks informer cache sync functions for testing hasSyncedFuncs []cache.InformerSynced + + // closeOnce ensures Close() is idempotent + closeOnce sync.Once } // resolveEnvValue resolves the value of an environment variable, supporting both @@ -411,10 +414,13 @@ func (p *podEventLogger) sendDelete(token string) { } // Close stops the pod event logger and releases all resources. +// Close is idempotent and safe to call multiple times. func (p *podEventLogger) Close() error { - p.cancelFunc() - close(p.stopChan) - close(p.errChan) + p.closeOnce.Do(func() { + p.cancelFunc() + close(p.stopChan) + close(p.errChan) + }) return nil } diff --git a/logger_test.go b/logger_test.go index fd1dbcf..c8a9df9 100644 --- a/logger_test.go +++ b/logger_test.go @@ -1179,6 +1179,90 @@ func Test_logCache(t *testing.T) { }) } +func TestCloseIdempotent(t *testing.T) { + t.Parallel() + + api := newFakeAgentAPI(t) + + ctx := testutil.Context(t, testutil.WaitShort) + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + namespace := "test-namespace" + + client := fake.NewSimpleClientset() + + cMock := quartz.NewMock(t) + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, + clock: cMock, + }) + require.NoError(t, err) + + // First close should succeed + err = reporter.Close() + require.NoError(t, err) + + // Second close should not panic (idempotent) + err = reporter.Close() + require.NoError(t, err) +} + +func TestCloseDuringProcessing(t *testing.T) { + t.Parallel() + + api := newFakeAgentAPI(t) + + ctx := testutil.Context(t, testutil.WaitShort) + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + namespace := "test-namespace" + + client := fake.NewSimpleClientset() + + cMock := quartz.NewMock(t) + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, + clock: cMock, + }) + require.NoError(t, err) + + // Create a pod to trigger processing + pod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-pod-close", + Namespace: namespace, + CreationTimestamp: v1.Time{ + Time: time.Now().Add(time.Hour), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Env: []corev1.EnvVar{{ + Name: "CODER_AGENT_TOKEN", + Value: "test-token", + }}, + }}, + }, + } + _, err = client.CoreV1().Pods(namespace).Create(ctx, pod, v1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source to be registered + _ = testutil.RequireReceive(ctx, t, api.logSource) + + // Close while processing is active + err = reporter.Close() + require.NoError(t, err) +} + func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { logger := slogtest.Make(t, nil) mux := drpcmux.New() From cc9fb1a81b76bc4d8215113e299b7c74f2e6cef2 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Thu, 5 Feb 2026 09:36:56 +0000 Subject: [PATCH 2/5] fix: cleanup retry timers when work loop exits Stop all active retry timers in logQueuer.cleanup() when the context is canceled. This prevents retry timers from continuing to fire after the podEventLogger is closed, which was causing integration tests to hang waiting for httptest.Server to close. --- logger.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/logger.go b/logger.go index 0c712a8..150dc6c 100644 --- a/logger.go +++ b/logger.go @@ -510,6 +510,8 @@ type logQueuer struct { } func (l *logQueuer) work(ctx context.Context) { + defer l.cleanup() + for ctx.Err() == nil { select { case log := <-l.q: @@ -527,6 +529,19 @@ func (l *logQueuer) work(ctx context.Context) { } } +// cleanup stops all retry timers and cleans up resources when the work loop exits. +func (l *logQueuer) cleanup() { + l.mu.Lock() + defer l.mu.Unlock() + + for token, rs := range l.retries { + if rs != nil && rs.timer != nil { + rs.timer.Stop() + } + delete(l.retries, token) + } +} + func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { client := agentsdk.New(l.coderURL, agentsdk.WithFixedToken(log.agentToken)) logger := l.logger.With(slog.F("resource_name", log.resourceName)) From 715ebfa1a61bad8cb33f149c94df8e3a1904271f Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Thu, 5 Feb 2026 12:12:18 +0000 Subject: [PATCH 3/5] fix: wait for work goroutine to exit in Close() Address review feedback: Close() now waits for the work goroutine to fully exit before returning. Added doneChan that is closed when the work loop exits, and Close() blocks on receiving from it. Also moved work goroutine startup to newPodEventLogger to ensure it's only started once (not per-namespace). --- logger.go | 13 ++++++++++--- logger_test.go | 8 ++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/logger.go b/logger.go index 150dc6c..c12e61f 100644 --- a/logger.go +++ b/logger.go @@ -86,8 +86,12 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve }, maxRetries: opts.maxRetries, }, + doneChan: make(chan struct{}), } + // Start the work goroutine once + go reporter.lq.work(reporter.ctx, reporter.doneChan) + // If no namespaces are provided, we listen for events in all namespaces. if len(opts.namespaces) == 0 { if err := reporter.initNamespace(""); err != nil { @@ -122,6 +126,8 @@ type podEventLogger struct { // closeOnce ensures Close() is idempotent closeOnce sync.Once + // doneChan is closed when the work goroutine exits + doneChan chan struct{} } // resolveEnvValue resolves the value of an environment variable, supporting both @@ -164,8 +170,6 @@ func (p *podEventLogger) initNamespace(namespace string) error { // This is to prevent us from sending duplicate events. startTime := time.Now() - go p.lq.work(p.ctx) - podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { lo.FieldSelector = p.fieldSelector lo.LabelSelector = p.labelSelector @@ -421,6 +425,8 @@ func (p *podEventLogger) Close() error { close(p.stopChan) close(p.errChan) }) + // Wait for the work goroutine to exit + <-p.doneChan return nil } @@ -509,8 +515,9 @@ type logQueuer struct { maxRetries int } -func (l *logQueuer) work(ctx context.Context) { +func (l *logQueuer) work(ctx context.Context, done chan struct{}) { defer l.cleanup() + defer close(done) for ctx.Err() == nil { select { diff --git a/logger_test.go b/logger_test.go index c8a9df9..df509e8 100644 --- a/logger_test.go +++ b/logger_test.go @@ -675,7 +675,7 @@ func Test_logQueuer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - go lq.work(ctx) + go lq.work(ctx, make(chan struct{})) ch <- agentLog{ op: opLog, @@ -742,7 +742,7 @@ func Test_logQueuer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - go lq.work(ctx) + go lq.work(ctx, make(chan struct{})) token := "retry-token" ch <- agentLog{ @@ -905,7 +905,7 @@ func Test_logQueuer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - go lq.work(ctx) + go lq.work(ctx, make(chan struct{})) token := "max-retry-token" ch <- agentLog{ @@ -1111,7 +1111,7 @@ func Test_logCache(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - go lq.work(ctx) + go lq.work(ctx, make(chan struct{})) token := "test-token" From 48576958d2e8598c6befbc2ca0ea6daaf9f08eb3 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Thu, 5 Feb 2026 12:58:44 +0000 Subject: [PATCH 4/5] fix: reorder defers so cleanup runs before closing done channel --- logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logger.go b/logger.go index c12e61f..9d5c9b1 100644 --- a/logger.go +++ b/logger.go @@ -516,8 +516,8 @@ type logQueuer struct { } func (l *logQueuer) work(ctx context.Context, done chan struct{}) { - defer l.cleanup() defer close(done) + defer l.cleanup() for ctx.Err() == nil { select { From 29d40aeab875cbae97eb31a15a527e0f370a872a Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Thu, 5 Feb 2026 13:00:35 +0000 Subject: [PATCH 5/5] fix: cleanup goroutine on initialization error If initNamespace fails, cancel the context and wait for the work goroutine to exit to prevent goroutine leaks. --- logger.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/logger.go b/logger.go index 9d5c9b1..72e9346 100644 --- a/logger.go +++ b/logger.go @@ -95,11 +95,15 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve // If no namespaces are provided, we listen for events in all namespaces. if len(opts.namespaces) == 0 { if err := reporter.initNamespace(""); err != nil { + reporter.cancelFunc() + <-reporter.doneChan return nil, fmt.Errorf("init namespace: %w", err) } } else { for _, namespace := range opts.namespaces { if err := reporter.initNamespace(namespace); err != nil { + reporter.cancelFunc() + <-reporter.doneChan return nil, err } }