Skip to content

Transport action changes for streams write restrictions #130742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

lukewhiting
Copy link
Contributor

@lukewhiting lukewhiting commented Jul 7, 2025

Very early PR for restricting writes to substreams (<streamName>.*) via the single document, bulk document and reroute pipeline processors.

Discussion to be had with product on if additional restrictions (Such as changing routing in painless scripts) also needs to be restricted and the additional cost of that.

Fixes ES-11941

@lukewhiting lukewhiting changed the title Es 11941 streams logs bulk transport changes Transport action changes for streams write restrictions Jul 7, 2025
@lukewhiting lukewhiting requested a review from Copilot July 7, 2025 16:04
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces write restrictions for substreams (e.g., <streamName>.*) across single-document indexing, bulk operations, and ingest pipeline reroute processors. It adds a StreamsPermissionsUtils helper for checking and enforcing these restrictions, integrates checks in BulkOperation and RerouteProcessor, and extends tests and REST specs for validating the new behavior.

  • Added StreamType enum and StreamsPermissionsUtils singleton with streamTypeIsEnabled and throwIfRetrouteToSubstreamNotAllowed.
  • Updated BulkOperation and its factory/tests to inject and apply StreamsPermissionsUtils for blocking writes to child streams when enabled.
  • Enhanced RerouteProcessor and its factory/tests to enforce reroute restrictions, and added YAML REST tests under modules/streams.

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
server/src/main/java/org/elasticsearch/common/streams/StreamType.java Introduces a new enum for stream names (currently only LOGS).
server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java Adds singleton utility to check stream enablement and block reroutes.
server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java Injects and uses StreamsPermissionsUtils to filter bulk substream writes.
server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java Unit tests for new permissions utility.
server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java Extended tests for bulk write restrictions and mocks for new utility.
modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml REST YAML tests validating substream write errors.
modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java Adjusts test cluster modules to include ingest-common.
modules/streams/build.gradle Exports common.streams package and updates REST resource includes.
modules/ingest-common/src/java/.../RerouteProcessor*.java Updates RerouteProcessor and factory to invoke StreamsPermissionsUtils.

return INSTANCE;
}

public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination)
Copy link
Preview

Copilot AI Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name 'throwIfRetrouteToSubstreamNotAllowed' contains a typo: 'Retroute' should be 'Reroute' to match intended behavior and naming consistency.

Suggested change
public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination)
public void throwIfRerouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination)

Copilot uses AI. Check for mistakes.

ds4FailureStore1
)
)
.indices(new HashMap<>() {
Copy link
Preview

Copilot AI Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider avoiding double-brace initialization for the indices map, as it creates an anonymous subclass; prefer Map.ofEntries(...) or a standard HashMap with explicit put calls.

Copilot uses AI. Check for mistakes.

@@ -215,10 +250,18 @@ public class BulkOperationTests extends ESTestCase {

private TestThreadPool threadPool;

private StreamsPermissionsUtils streamsPermissionsUtilsMock;
private ProjectResolver projectResolverMock;
Copy link
Preview

Copilot AI Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field 'projectResolverMock' is declared but not used in this test class; consider removing it to reduce clutter.

Suggested change
private ProjectResolver projectResolverMock;
// Removed the unused projectResolverMock field.

Copilot uses AI. Check for mistakes.

return INSTANCE;
}

public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, but you misspelled Reroute

Comment on lines +286 to +309
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);

Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
.filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));

for (StreamType streamType : enabledStreamTypes) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> req = bulkRequest.requests.get(i);
String prefix = streamType.getStreamName() + ".";
if (req != null && req.index().startsWith(prefix)) {
IllegalArgumentException exception = new IllegalArgumentException(
"Writes to child stream ["
+ req.index()
+ "] are not allowed, use the parent stream instead: ["
+ streamType.getStreamName()
+ "]"
);
IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're going to want to move this logic to somewhere earlier, like in TransportAbstractBulkAction::doRun, just before calling applyPipelinesAndDoInternalExecute. The reason is that at this point in BulkOperation the req.index() will already be the final destination, which could very well be a substream. And we have no history. Take this example:

PUT _ingest/pipeline/pipeline1
{
   "processors" : [
    {
      "reroute" : {
        "destination": "logs.abc"
      }
    }
  ]
}

PUT _ingest/pipeline/pipeline2
{
   "processors" : [
    {
      "reroute" : {
        "destination": "logs.abc.def"
      }
    }
  ]
}

PUT logs
{
    "settings": {
    "index.default_pipeline": "pipeline1",
    "number_of_replicas": 0
  }
}

PUT logs.abc
{
    "settings": {
    "index.default_pipeline": "pipeline2",
    "number_of_replicas": 0
  }
}

PUT logs/_bulk?_source=true
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
{ "delete" : { "_id" : "vYQUpJcBmbnJKL4xPLSK" } }

I would expect the first entry to successfully write into logs.abc.def, and the second one to fail (since we don't allow delete). But the first one fails (and so does the 2nd one, but only b/c the entry doesn't exist).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants