|
8 | 8 |
|
9 | 9 | package org.elasticsearch.search.profile.query; |
10 | 10 |
|
| 11 | +import org.apache.lucene.sandbox.search.ProfilerCollector; |
11 | 12 | import org.apache.lucene.search.Collector; |
12 | 13 | import org.apache.lucene.search.CollectorManager; |
13 | 14 |
|
14 | 15 | import java.io.IOException; |
| 16 | +import java.util.ArrayList; |
15 | 17 | import java.util.Collection; |
16 | | -import java.util.Collections; |
17 | 18 | import java.util.List; |
18 | | -import java.util.stream.Collectors; |
19 | 19 |
|
20 | 20 | /** |
21 | 21 | * A {@link CollectorManager} that takes another CollectorManager as input and wraps all Collectors generated by it |
22 | 22 | * in an {@link InternalProfileCollector}. It delegates all the profiling to the generated collectors via {@link #getCollectorTree()} |
23 | | - * and joins them up when its {@link #reduce} method is called. The profile result can |
| 23 | + * and joins the different collector trees together when its {@link #reduce} method is called. |
| 24 | + * Supports optionally providing sub-collector managers for top docs as well as aggs collection, so that each |
| 25 | + * {@link InternalProfileCollector} created is provided with the corresponding sub-collectors that are children of the top-level collector. |
| 26 | + * @param <T> the return type of the wrapped collector manager, which the reduce method returns. |
24 | 27 | */ |
25 | 28 | public final class ProfileCollectorManager<T> implements CollectorManager<InternalProfileCollector, T> { |
26 | 29 |
|
27 | | - private final CollectorManager<Collector, T> collectorManager; |
| 30 | + private final CollectorManager<? extends Collector, T> collectorManager; |
28 | 31 | private final String reason; |
| 32 | + private final ProfileCollectorManager<?> topDocsSubCollectorManager; |
| 33 | + private final ProfileCollectorManager<?> aggsSubCollectorManager; |
| 34 | + // this is a bit of a hack: it allows us to retrieve the last collector that newCollector has returned for sub-collector managers, |
| 35 | + // so that we can provide them to InternalProfileCollector's constructor as children. This is fine as newCollector does not get called |
| 36 | + // concurrently, but rather in advance before parallelizing the collection |
| 37 | + private InternalProfileCollector profileCollector; |
| 38 | + |
29 | 39 | private CollectorResult collectorTree; |
30 | 40 |
|
31 | | - @SuppressWarnings("unchecked") |
32 | 41 | public ProfileCollectorManager(CollectorManager<? extends Collector, T> collectorManager, String reason) { |
33 | | - this.collectorManager = (CollectorManager<Collector, T>) collectorManager; |
| 42 | + this(collectorManager, reason, null, null); |
| 43 | + } |
| 44 | + |
| 45 | + public ProfileCollectorManager( |
| 46 | + CollectorManager<? extends Collector, T> collectorManager, |
| 47 | + String reason, |
| 48 | + ProfileCollectorManager<?> topDocsSubCollectorManager, |
| 49 | + ProfileCollectorManager<?> aggsSubCollectorManager |
| 50 | + ) { |
| 51 | + this.collectorManager = collectorManager; |
34 | 52 | this.reason = reason; |
| 53 | + assert assertSubCollectorManagers() : "top docs manager is null while aggs manager isn't"; |
| 54 | + this.topDocsSubCollectorManager = topDocsSubCollectorManager; |
| 55 | + this.aggsSubCollectorManager = aggsSubCollectorManager; |
| 56 | + } |
| 57 | + |
| 58 | + private boolean assertSubCollectorManagers() { |
| 59 | + if (aggsSubCollectorManager != null) { |
| 60 | + return topDocsSubCollectorManager != null; |
| 61 | + } |
| 62 | + return true; |
35 | 63 | } |
36 | 64 |
|
37 | 65 | @Override |
38 | 66 | public InternalProfileCollector newCollector() throws IOException { |
39 | | - return new InternalProfileCollector(collectorManager.newCollector(), reason); |
| 67 | + Collector collector = collectorManager.newCollector(); |
| 68 | + if (aggsSubCollectorManager == null && topDocsSubCollectorManager == null) { |
| 69 | + profileCollector = new InternalProfileCollector(collector, reason); |
| 70 | + } else if (aggsSubCollectorManager == null) { |
| 71 | + assert topDocsSubCollectorManager.profileCollector != null; |
| 72 | + profileCollector = new InternalProfileCollector(collector, reason, topDocsSubCollectorManager.profileCollector); |
| 73 | + } else { |
| 74 | + assert topDocsSubCollectorManager.profileCollector != null && aggsSubCollectorManager.profileCollector != null; |
| 75 | + profileCollector = new InternalProfileCollector( |
| 76 | + collector, |
| 77 | + reason, |
| 78 | + topDocsSubCollectorManager.profileCollector, |
| 79 | + aggsSubCollectorManager.profileCollector |
| 80 | + ); |
| 81 | + } |
| 82 | + return profileCollector; |
40 | 83 | } |
41 | 84 |
|
| 85 | + @Override |
42 | 86 | public T reduce(Collection<InternalProfileCollector> profileCollectors) throws IOException { |
43 | 87 | assert profileCollectors.size() > 0 : "at least one collector expected"; |
44 | | - List<Collector> unwrapped = profileCollectors.stream() |
45 | | - .map(InternalProfileCollector::getWrappedCollector) |
46 | | - .collect(Collectors.toList()); |
47 | | - T returnValue = collectorManager.reduce(unwrapped); |
48 | | - |
49 | | - List<CollectorResult> resultsPerProfiler = profileCollectors.stream() |
50 | | - .map(ipc -> ipc.getCollectorTree()) |
51 | | - .collect(Collectors.toList()); |
| 88 | + List<Collector> unwrapped = profileCollectors.stream().map(InternalProfileCollector::getWrappedCollector).toList(); |
| 89 | + @SuppressWarnings("unchecked") |
| 90 | + CollectorManager<Collector, T> cm = (CollectorManager<Collector, T>) collectorManager; |
| 91 | + T returnValue = cm.reduce(unwrapped); |
52 | 92 |
|
| 93 | + List<CollectorResult> resultsPerProfiler = profileCollectors.stream().map(InternalProfileCollector::getCollectorTree).toList(); |
53 | 94 | long totalTime = resultsPerProfiler.stream().map(CollectorResult::getTime).reduce(0L, Long::sum); |
54 | 95 | String collectorName = resultsPerProfiler.get(0).getName(); |
55 | | - this.collectorTree = new CollectorResult(collectorName, reason, totalTime, Collections.emptyList()); |
| 96 | + assert profileCollectors.stream().map(ProfilerCollector::getReason).allMatch(reason::equals); |
| 97 | + assert profileCollectors.stream().map(ProfilerCollector::getName).allMatch(collectorName::equals); |
| 98 | + assert assertChildrenSize(resultsPerProfiler); |
| 99 | + |
| 100 | + List<CollectorResult> childrenResults = new ArrayList<>(); |
| 101 | + // for the children collector managers, we rely on the chain on reduce calls to make their collector results available |
| 102 | + if (topDocsSubCollectorManager != null) { |
| 103 | + childrenResults.add(topDocsSubCollectorManager.getCollectorTree()); |
| 104 | + } |
| 105 | + if (aggsSubCollectorManager != null) { |
| 106 | + childrenResults.add(aggsSubCollectorManager.getCollectorTree()); |
| 107 | + } |
| 108 | + this.collectorTree = new CollectorResult(collectorName, reason, totalTime, childrenResults); |
| 109 | + |
56 | 110 | return returnValue; |
57 | 111 | } |
58 | 112 |
|
| 113 | + private boolean assertChildrenSize(List<CollectorResult> resultsPerProfiler) { |
| 114 | + int expectedSize = 0; |
| 115 | + if (topDocsSubCollectorManager != null) { |
| 116 | + expectedSize++; |
| 117 | + } |
| 118 | + if (aggsSubCollectorManager != null) { |
| 119 | + expectedSize++; |
| 120 | + } |
| 121 | + final int expectedChildrenSize = expectedSize; |
| 122 | + return resultsPerProfiler.stream() |
| 123 | + .map(collectorResult -> collectorResult.getChildrenResults().size()) |
| 124 | + .allMatch(integer -> integer == expectedChildrenSize); |
| 125 | + } |
| 126 | + |
59 | 127 | public CollectorResult getCollectorTree() { |
60 | 128 | if (this.collectorTree == null) { |
61 | 129 | throw new IllegalStateException("A collectorTree hasn't been set yet. Call reduce() before attempting to retrieve it"); |
|
0 commit comments