Skip to content

Commit 4abb86f

Browse files
committed
Update cloudwatch logs receiver with latest upstream changes
Starting with Alloy 1.9.x the cloudwatch logs receiver starts reading from the start which causes duplicate logs in case of restarts. * Add `start_from` arg * Add storage capabilities Relates-To: open-telemetry/opentelemetry-collector-contrib#39007
1 parent d7a153c commit 4abb86f

File tree

5 files changed

+71
-10
lines changed

5 files changed

+71
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Main (unreleased)
4141

4242
- Add support for `conditions` and statement-specific `error_mode` in `otelcol.processor.transform`. (@ptodev)
4343

44+
- Add `storage` and `start_from` args to cloudwatch logs receiver. (@boernd)
4445

4546
### Bugfixes
4647

docs/sources/reference/components/otelcol/otelcol.receiver.awscloudwatch.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ otelcol.receiver.awscloudwatch "<LABEL>" {
3939

4040
You can use the following arguments with `otelcol.receiver.awscloudwatch`:
4141

42-
| Name | Type | Description | Default | Required |
43-
| --------------- | -------- | -------------------------------- | ------- | -------- |
44-
| `region` | `string` | AWS region to collect logs from. | | yes |
45-
| `imds_endpoint` | `string` | Custom EC2 IMDS endpoint to use. | | no |
46-
| `profile` | `string` | AWS credentials profile to use. | | no |
42+
| Name | Type | Description | Default | Required |
43+
| --------------- | -------------------------- | ------------------------------------------------------------------------- | ------- | -------- |
44+
| `region` | `string` | AWS region to collect logs from. | | yes |
45+
| `imds_endpoint` | `string` | Custom EC2 IMDS endpoint to use. | | no |
46+
| `profile` | `string` | AWS credentials profile to use. | | no |
47+
| `storage` | `capsule(otelcol.Handler)` | Handler from an `otelcol.storage` component to use for persisting state. | | no |
4748

4849
If `imds_endpoint` isn't specified, and the environment variable `AWS_EC2_METADATA_SERVICE_ENDPOINT` has a value, it will be used as the IMDS endpoint.
4950

@@ -81,6 +82,7 @@ The following arguments are supported:
8182
| ------------------------ | ---------- | -------------------------------------------------------------- | ------- | -------- |
8283
| `max_events_per_request` | `int` | Maximum number of events to process per request to CloudWatch. | `1000` | no |
8384
| `poll_interval` | `duration` | How frequently to poll for new log entries. | `"1m"` | no |
85+
| `start_from` | `string` | Timestamp in RFC3339 format where to start reading logs. | `""` | no |
8486

8587
The `logs` block supports the following blocks:
8688

internal/component/otelcol/receiver/awscloudwatch/awscloudwatch.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
package awscloudwatch
33

44
import (
5+
"fmt"
6+
57
"github.com/grafana/alloy/internal/component"
68
"github.com/grafana/alloy/internal/component/otelcol"
79
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
10+
"github.com/grafana/alloy/internal/component/otelcol/extension"
811
"github.com/grafana/alloy/internal/component/otelcol/receiver"
912
"github.com/grafana/alloy/internal/featuregate"
1013
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver"
@@ -29,10 +32,11 @@ func init() {
2932

3033
// Arguments configures the otelcol.receiver.awscloudwatch component.
3134
type Arguments struct {
32-
Region string `alloy:"region,attr"`
33-
Profile string `alloy:"profile,attr,optional"`
34-
IMDSEndpoint string `alloy:"imds_endpoint,attr,optional"`
35-
Logs LogsConfig `alloy:"logs,block,optional"`
35+
Region string `alloy:"region,attr"`
36+
Profile string `alloy:"profile,attr,optional"`
37+
IMDSEndpoint string `alloy:"imds_endpoint,attr,optional"`
38+
Logs LogsConfig `alloy:"logs,block,optional"`
39+
Storage *extension.ExtensionHandler `alloy:"storage,attr,optional"`
3640

3741
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
3842

@@ -79,12 +83,25 @@ func (args Arguments) Convert() (otelcomponent.Config, error) {
7983
otelConfig.Logs.Groups.AutodiscoverConfig.Limit = defaultLogGroupLimit
8084
}
8185

86+
// Configure storage if args.Storage is set.
87+
if args.Storage != nil {
88+
if args.Storage.Extension == nil {
89+
return nil, fmt.Errorf("missing storage extension")
90+
}
91+
92+
otelConfig.StorageID = &args.Storage.ID
93+
}
94+
8295
return otelConfig, nil
8396
}
8497

8598
// Extensions implements receiver.Arguments.
8699
func (args Arguments) Extensions() map[otelcomponent.ID]otelcomponent.Component {
87-
return nil
100+
m := make(map[otelcomponent.ID]otelcomponent.Component)
101+
if args.Storage != nil {
102+
m[args.Storage.ID] = args.Storage.Extension
103+
}
104+
return m
88105
}
89106

90107
// Exporters implements receiver.Arguments.

internal/component/otelcol/receiver/awscloudwatch/awscloudwatch_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestArguments_UnmarshalAlloy(t *testing.T) {
2525
expected: awscloudwatchreceiver.Config{
2626
Region: "us-west-2",
2727
Logs: &awscloudwatchreceiver.LogsConfig{
28+
StartFrom: "",
2829
PollInterval: time.Minute,
2930
MaxEventsPerRequest: 1000,
3031
Groups: awscloudwatchreceiver.GroupConfig{
@@ -156,6 +157,32 @@ func TestArguments_UnmarshalAlloy(t *testing.T) {
156157
},
157158
},
158159
},
160+
{
161+
testName: "start_from configuration set",
162+
cfg: `
163+
region = "us-west-2"
164+
logs {
165+
poll_interval = "1m"
166+
start_from = "2025-06-25T00:00:00Z"
167+
}
168+
output {}
169+
`,
170+
expected: awscloudwatchreceiver.Config{
171+
Region: "us-west-2",
172+
Logs: &awscloudwatchreceiver.LogsConfig{
173+
StartFrom: "2025-06-25T00:00:00Z",
174+
PollInterval: time.Minute,
175+
MaxEventsPerRequest: 1000,
176+
Groups: awscloudwatchreceiver.GroupConfig{
177+
AutodiscoverConfig: &awscloudwatchreceiver.AutodiscoverConfig{
178+
Limit: 50,
179+
Streams: awscloudwatchreceiver.StreamConfig{},
180+
},
181+
NamedConfigs: map[string]awscloudwatchreceiver.StreamConfig{},
182+
},
183+
},
184+
},
185+
},
159186
}
160187

161188
for _, tc := range tests {
@@ -233,6 +260,18 @@ func TestArguments_Validate(t *testing.T) {
233260
`,
234261
expectedError: "both autodiscover and named configs are configured, Only one or the other is permitted",
235262
},
263+
{
264+
testName: "invalid start_from configuration set",
265+
cfg: `
266+
region = "us-west-2"
267+
logs {
268+
poll_interval = "1m"
269+
start_from = "earliest"
270+
}
271+
output {}
272+
`,
273+
expectedError: "invalid start_from time format",
274+
},
236275
}
237276

238277
for _, tc := range tests {

internal/component/otelcol/receiver/awscloudwatch/config_awscloudwatch.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type LogsConfig struct {
1313
PollInterval time.Duration `alloy:"poll_interval,attr,optional"`
1414
MaxEventsPerRequest int `alloy:"max_events_per_request,attr,optional"`
1515
Groups GroupConfig `alloy:"groups,block,optional"`
16+
StartFrom string `alloy:"start_from,attr,optional"`
1617
}
1718

1819
func (args *LogsConfig) Convert() *awscloudwatchreceiver.LogsConfig {
@@ -23,6 +24,7 @@ func (args *LogsConfig) Convert() *awscloudwatchreceiver.LogsConfig {
2324
PollInterval: args.PollInterval,
2425
MaxEventsPerRequest: args.MaxEventsPerRequest,
2526
Groups: args.Groups.Convert(),
27+
StartFrom: args.StartFrom,
2628
}
2729
}
2830

0 commit comments

Comments
 (0)