Skip to content

Commit c6718de

Browse files
authored
Update cloudwatch logs receiver with latest upstream changes (#3909)
Starting with Alloy 1.9.x the cloudwatch logs receiver starts reading from the start which causes duplicate logs in case of restarts. * Add `start_from` arg * Add storage capabilities Relates-To: open-telemetry/opentelemetry-collector-contrib#39007
1 parent d89936b commit c6718de

File tree

5 files changed

+71
-10
lines changed

5 files changed

+71
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Main (unreleased)
4141

4242
- Add support for `conditions` and statement-specific `error_mode` in `otelcol.processor.transform`. (@ptodev)
4343

44+
- Add `storage` and `start_from` args to cloudwatch logs receiver. (@boernd)
4445

4546
### Bugfixes
4647

docs/sources/reference/components/otelcol/otelcol.receiver.awscloudwatch.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ otelcol.receiver.awscloudwatch "<LABEL>" {
3939

4040
You can use the following arguments with `otelcol.receiver.awscloudwatch`:
4141

42-
| Name | Type | Description | Default | Required |
43-
| --------------- | -------- | -------------------------------- | ------- | -------- |
44-
| `region` | `string` | AWS region to collect logs from. | | yes |
45-
| `imds_endpoint` | `string` | Custom EC2 IMDS endpoint to use. | | no |
46-
| `profile` | `string` | AWS credentials profile to use. | | no |
42+
| Name | Type | Description | Default | Required |
43+
| --------------- | -------------------------- | ------------------------------------------------------------------------- | ------- | -------- |
44+
| `region` | `string` | AWS region to collect logs from. | | yes |
45+
| `imds_endpoint` | `string` | Custom EC2 IMDS endpoint to use. | | no |
46+
| `profile` | `string` | AWS credentials profile to use. | | no |
47+
| `storage` | `capsule(otelcol.Handler)` | Handler from an `otelcol.storage` component to use for persisting state. | | no |
4748

4849
If `imds_endpoint` isn't specified, and the environment variable `AWS_EC2_METADATA_SERVICE_ENDPOINT` has a value, it will be used as the IMDS endpoint.
4950

@@ -81,6 +82,7 @@ The following arguments are supported:
8182
| ------------------------ | ---------- | -------------------------------------------------------------- | ------- | -------- |
8283
| `max_events_per_request` | `int` | Maximum number of events to process per request to CloudWatch. | `1000` | no |
8384
| `poll_interval` | `duration` | How frequently to poll for new log entries. | `"1m"` | no |
85+
| `start_from` | `string` | Timestamp in RFC3339 format where to start reading logs. | `""` | no |
8486

8587
The `logs` block supports the following blocks:
8688

internal/component/otelcol/receiver/awscloudwatch/awscloudwatch.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
package awscloudwatch
33

44
import (
5+
"fmt"
6+
57
"github.com/grafana/alloy/internal/component"
68
"github.com/grafana/alloy/internal/component/otelcol"
79
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
10+
"github.com/grafana/alloy/internal/component/otelcol/extension"
811
"github.com/grafana/alloy/internal/component/otelcol/receiver"
912
"github.com/grafana/alloy/internal/featuregate"
1013
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver"
@@ -29,10 +32,11 @@ func init() {
2932

3033
// Arguments configures the otelcol.receiver.awscloudwatch component.
3134
type Arguments struct {
32-
Region string `alloy:"region,attr"`
33-
Profile string `alloy:"profile,attr,optional"`
34-
IMDSEndpoint string `alloy:"imds_endpoint,attr,optional"`
35-
Logs LogsConfig `alloy:"logs,block,optional"`
35+
Region string `alloy:"region,attr"`
36+
Profile string `alloy:"profile,attr,optional"`
37+
IMDSEndpoint string `alloy:"imds_endpoint,attr,optional"`
38+
Logs LogsConfig `alloy:"logs,block,optional"`
39+
Storage *extension.ExtensionHandler `alloy:"storage,attr,optional"`
3640

3741
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
3842

@@ -79,12 +83,25 @@ func (args Arguments) Convert() (otelcomponent.Config, error) {
7983
otelConfig.Logs.Groups.AutodiscoverConfig.Limit = defaultLogGroupLimit
8084
}
8185

86+
// Configure storage if args.Storage is set.
87+
if args.Storage != nil {
88+
if args.Storage.Extension == nil {
89+
return nil, fmt.Errorf("missing storage extension")
90+
}
91+
92+
otelConfig.StorageID = &args.Storage.ID
93+
}
94+
8295
return otelConfig, nil
8396
}
8497

8598
// Extensions implements receiver.Arguments.
8699
func (args Arguments) Extensions() map[otelcomponent.ID]otelcomponent.Component {
87-
return nil
100+
m := make(map[otelcomponent.ID]otelcomponent.Component)
101+
if args.Storage != nil {
102+
m[args.Storage.ID] = args.Storage.Extension
103+
}
104+
return m
88105
}
89106

90107
// Exporters implements receiver.Arguments.

internal/component/otelcol/receiver/awscloudwatch/awscloudwatch_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestArguments_UnmarshalAlloy(t *testing.T) {
2525
expected: awscloudwatchreceiver.Config{
2626
Region: "us-west-2",
2727
Logs: &awscloudwatchreceiver.LogsConfig{
28+
StartFrom: "",
2829
PollInterval: time.Minute,
2930
MaxEventsPerRequest: 1000,
3031
Groups: awscloudwatchreceiver.GroupConfig{
@@ -156,6 +157,32 @@ func TestArguments_UnmarshalAlloy(t *testing.T) {
156157
},
157158
},
158159
},
160+
{
161+
testName: "start_from configuration set",
162+
cfg: `
163+
region = "us-west-2"
164+
logs {
165+
poll_interval = "1m"
166+
start_from = "2025-06-25T00:00:00Z"
167+
}
168+
output {}
169+
`,
170+
expected: awscloudwatchreceiver.Config{
171+
Region: "us-west-2",
172+
Logs: &awscloudwatchreceiver.LogsConfig{
173+
StartFrom: "2025-06-25T00:00:00Z",
174+
PollInterval: time.Minute,
175+
MaxEventsPerRequest: 1000,
176+
Groups: awscloudwatchreceiver.GroupConfig{
177+
AutodiscoverConfig: &awscloudwatchreceiver.AutodiscoverConfig{
178+
Limit: 50,
179+
Streams: awscloudwatchreceiver.StreamConfig{},
180+
},
181+
NamedConfigs: map[string]awscloudwatchreceiver.StreamConfig{},
182+
},
183+
},
184+
},
185+
},
159186
}
160187

161188
for _, tc := range tests {
@@ -233,6 +260,18 @@ func TestArguments_Validate(t *testing.T) {
233260
`,
234261
expectedError: "both autodiscover and named configs are configured, Only one or the other is permitted",
235262
},
263+
{
264+
testName: "invalid start_from configuration set",
265+
cfg: `
266+
region = "us-west-2"
267+
logs {
268+
poll_interval = "1m"
269+
start_from = "earliest"
270+
}
271+
output {}
272+
`,
273+
expectedError: "invalid start_from time format",
274+
},
236275
}
237276

238277
for _, tc := range tests {

internal/component/otelcol/receiver/awscloudwatch/config_awscloudwatch.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type LogsConfig struct {
1313
PollInterval time.Duration `alloy:"poll_interval,attr,optional"`
1414
MaxEventsPerRequest int `alloy:"max_events_per_request,attr,optional"`
1515
Groups GroupConfig `alloy:"groups,block,optional"`
16+
StartFrom string `alloy:"start_from,attr,optional"`
1617
}
1718

1819
func (args *LogsConfig) Convert() *awscloudwatchreceiver.LogsConfig {
@@ -23,6 +24,7 @@ func (args *LogsConfig) Convert() *awscloudwatchreceiver.LogsConfig {
2324
PollInterval: args.PollInterval,
2425
MaxEventsPerRequest: args.MaxEventsPerRequest,
2526
Groups: args.Groups.Convert(),
27+
StartFrom: args.StartFrom,
2628
}
2729
}
2830

0 commit comments

Comments
 (0)