-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Transport action changes for streams write restrictions #130742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Transport action changes for streams write restrictions #130742
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces write restrictions for substreams (e.g., <streamName>.*
) across single-document indexing, bulk operations, and ingest pipeline reroute processors. It adds a StreamsPermissionsUtils
helper for checking and enforcing these restrictions, integrates checks in BulkOperation
and RerouteProcessor
, and extends tests and REST specs for validating the new behavior.
- Added
StreamType
enum andStreamsPermissionsUtils
singleton withstreamTypeIsEnabled
andthrowIfRetrouteToSubstreamNotAllowed
. - Updated
BulkOperation
and its factory/tests to inject and applyStreamsPermissionsUtils
for blocking writes to child streams when enabled. - Enhanced
RerouteProcessor
and its factory/tests to enforce reroute restrictions, and added YAML REST tests undermodules/streams
.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
server/src/main/java/org/elasticsearch/common/streams/StreamType.java | Introduces a new enum for stream names (currently only LOGS ). |
server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java | Adds singleton utility to check stream enablement and block reroutes. |
server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java | Injects and uses StreamsPermissionsUtils to filter bulk substream writes. |
server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java | Unit tests for new permissions utility. |
server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java | Extended tests for bulk write restrictions and mocks for new utility. |
modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml | REST YAML tests validating substream write errors. |
modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java | Adjusts test cluster modules to include ingest-common . |
modules/streams/build.gradle | Exports common.streams package and updates REST resource includes. |
modules/ingest-common/src/java/.../RerouteProcessor*.java | Updates RerouteProcessor and factory to invoke StreamsPermissionsUtils . |
return INSTANCE; | ||
} | ||
|
||
public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method name 'throwIfRetrouteToSubstreamNotAllowed' contains a typo: 'Retroute' should be 'Reroute' to match intended behavior and naming consistency.
public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination) | |
public void throwIfRerouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination) |
Copilot uses AI. Check for mistakes.
ds4FailureStore1 | ||
) | ||
) | ||
.indices(new HashMap<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider avoiding double-brace initialization for the indices map, as it creates an anonymous subclass; prefer Map.ofEntries(...)
or a standard HashMap
with explicit put
calls.
Copilot uses AI. Check for mistakes.
@@ -215,10 +250,18 @@ public class BulkOperationTests extends ESTestCase { | |||
|
|||
private TestThreadPool threadPool; | |||
|
|||
private StreamsPermissionsUtils streamsPermissionsUtilsMock; | |||
private ProjectResolver projectResolverMock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field 'projectResolverMock' is declared but not used in this test class; consider removing it to reduce clutter.
private ProjectResolver projectResolverMock; | |
// Removed the unused projectResolverMock field. |
Copilot uses AI. Check for mistakes.
return INSTANCE; | ||
} | ||
|
||
public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor thing, but you misspelled Reroute
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState); | ||
|
||
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values()) | ||
.filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata)) | ||
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); | ||
|
||
for (StreamType streamType : enabledStreamTypes) { | ||
for (int i = 0; i < bulkRequest.requests.size(); i++) { | ||
DocWriteRequest<?> req = bulkRequest.requests.get(i); | ||
String prefix = streamType.getStreamName() + "."; | ||
if (req != null && req.index().startsWith(prefix)) { | ||
IllegalArgumentException exception = new IllegalArgumentException( | ||
"Writes to child stream [" | ||
+ req.index() | ||
+ "] are not allowed, use the parent stream instead: [" | ||
+ streamType.getStreamName() | ||
+ "]" | ||
); | ||
IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception); | ||
addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're going to want to move this logic to somewhere earlier, like in TransportAbstractBulkAction::doRun
, just before calling applyPipelinesAndDoInternalExecute
. The reason is that at this point in BulkOperation the req.index()
will already be the final destination, which could very well be a substream. And we have no history. Take this example:
PUT _ingest/pipeline/pipeline1
{
"processors" : [
{
"reroute" : {
"destination": "logs.abc"
}
}
]
}
PUT _ingest/pipeline/pipeline2
{
"processors" : [
{
"reroute" : {
"destination": "logs.abc.def"
}
}
]
}
PUT logs
{
"settings": {
"index.default_pipeline": "pipeline1",
"number_of_replicas": 0
}
}
PUT logs.abc
{
"settings": {
"index.default_pipeline": "pipeline2",
"number_of_replicas": 0
}
}
PUT logs/_bulk?_source=true
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
{ "delete" : { "_id" : "vYQUpJcBmbnJKL4xPLSK" } }
I would expect the first entry to successfully write into logs.abc.def, and the second one to fail (since we don't allow delete). But the first one fails (and so does the 2nd one, but only b/c the entry doesn't exist).
Very early PR for restricting writes to substreams (
<streamName>.*
) via the single document, bulk document and reroute pipeline processors.Discussion to be had with product on if additional restrictions (Such as changing routing in painless scripts) also needs to be restricted and the additional cost of that.
Fixes ES-11941