Skip to content

Commit 6455bc3

Browse files
authored
IGNITE-19998 SQL Calcite: Add support of setting partitions in SqlFieldsQuery (apache#10870)
1 parent f025cc1 commit 6455bc3

File tree

9 files changed

+391
-27
lines changed

9 files changed

+391
-27
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ private <T> T processQuery(
564564
params,
565565
qryCtx,
566566
fldsQry != null && fldsQry.isLocal(),
567+
fldsQry != null ? fldsQry.getPartitions() : null,
567568
exchangeSvc,
568569
(q, ex) -> qryReg.unregister(q.id(), ex),
569570
log,

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public RootQuery(
107107
Object[] params,
108108
QueryContext qryCtx,
109109
boolean isLocal,
110+
int[] parts,
110111
ExchangeService exch,
111112
BiConsumer<Query<RowT>, Throwable> unregister,
112113
IgniteLogger log,
@@ -144,6 +145,7 @@ public RootQuery(
144145
.build()
145146
)
146147
.local(isLocal)
148+
.partitions(parts)
147149
.logger(log)
148150
.build();
149151
}
@@ -157,7 +159,7 @@ public RootQuery(
157159
* @param schema new schema.
158160
*/
159161
public RootQuery<RowT> childQuery(SchemaPlus schema) {
160-
return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), exch, unregister, log,
162+
return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), ctx.partitions(), exch, unregister, log,
161163
plannerTimeout, totalTimeout);
162164
}
163165

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@
7575
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
7676
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
7777
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
78+
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
7879
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
7980
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
81+
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
8082
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
8183
import org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
8284
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
@@ -564,6 +566,17 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
564566

565567
List<Fragment> fragments = plan.fragments();
566568

569+
if (!F.isEmpty(qry.context().partitions())) {
570+
fragments = Commons.transform(fragments, f -> {
571+
try {
572+
return f.filterByPartitions(qry.context().partitions());
573+
}
574+
catch (ColocationMappingException e) {
575+
throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e);
576+
}
577+
});
578+
}
579+
567580
// Local execution
568581
Fragment fragment = F.first(fragments);
569582

@@ -576,7 +589,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
576589

577590
List<UUID> nodes = mapping.nodeIds();
578591

579-
assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId());
592+
assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId())
593+
: "nodes=" + nodes + ", localNode=" + localNodeId();
580594
}
581595

582596
long timeout = qry.remainingTime();

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
4646
import org.apache.ignite.plugin.extensions.communication.MessageReader;
4747
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
48-
import org.jetbrains.annotations.NotNull;
4948

5049
/** */
5150
public class ColocationGroup implements MarshalableMessage {
@@ -114,7 +113,13 @@ public List<UUID> nodeIds() {
114113
* {@link GridDhtPartitionState#OWNING} state, calculated for distributed tables, involved in query execution.
115114
*/
116115
public List<List<UUID>> assignments() {
117-
return assignments == null ? Collections.emptyList() : assignments;
116+
if (assignments != null)
117+
return assignments;
118+
119+
if (!F.isEmpty(nodeIds))
120+
return nodeIds.stream().map(Collections::singletonList).collect(Collectors.toList());
121+
122+
return Collections.emptyList();
118123
}
119124

120125
/**
@@ -208,39 +213,50 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
208213
}
209214

210215
/** */
211-
public ColocationGroup finalaze() {
212-
if (assignments == null && nodeIds == null)
216+
public ColocationGroup finalizeMapping() {
217+
if (assignments == null)
213218
return this;
214219

215-
if (assignments != null) {
220+
List<List<UUID>> assignments = new ArrayList<>(this.assignments.size());
221+
Set<UUID> nodes = new HashSet<>();
222+
223+
for (List<UUID> assignment : this.assignments) {
224+
UUID first = F.first(assignment);
225+
if (first != null)
226+
nodes.add(first);
227+
assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
228+
}
229+
230+
return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments);
231+
}
232+
233+
/** */
234+
public ColocationGroup filterByPartitions(int[] parts) {
235+
if (!F.isEmpty(assignments)) {
216236
List<List<UUID>> assignments = new ArrayList<>(this.assignments.size());
217237
Set<UUID> nodes = new HashSet<>();
218-
for (List<UUID> assignment : this.assignments) {
219-
UUID first = F.first(assignment);
238+
239+
if (F.isEmpty(parts))
240+
return this;
241+
242+
for (int i = 0; i < this.assignments.size(); ++i) {
243+
UUID first = Arrays.binarySearch(parts, i) >= 0 ? F.first(this.assignments.get(i)) : null;
244+
220245
if (first != null)
221246
nodes.add(first);
222-
assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
247+
248+
assignments.add(first != null ? this.assignments.get(i) : Collections.emptyList());
223249
}
224250

225251
return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments);
226252
}
227253

228-
return forNodes0(nodeIds);
254+
return this;
229255
}
230256

231257
/** */
232258
public ColocationGroup mapToNodes(List<UUID> nodeIds) {
233-
return !F.isEmpty(this.nodeIds) ? this : forNodes0(nodeIds);
234-
}
235-
236-
/** */
237-
@NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
238-
List<List<UUID>> assignments = new ArrayList<>(nodeIds.size());
239-
240-
for (UUID nodeId : nodeIds)
241-
assignments.add(Collections.singletonList(nodeId));
242-
243-
return new ColocationGroup(sourceIds, nodeIds, assignments);
259+
return !F.isEmpty(this.nodeIds) ? this : new ColocationGroup(sourceIds, nodeIds, null);
244260
}
245261

246262
/**
@@ -250,6 +266,9 @@ public ColocationGroup mapToNodes(List<UUID> nodeIds) {
250266
* @return List of partitions to scan on the given node.
251267
*/
252268
public int[] partitions(UUID nodeId) {
269+
if (F.isEmpty(assignments))
270+
return null;
271+
253272
GridIntList parts = new GridIntList(assignments.size());
254273

255274
for (int i = 0; i < assignments.size(); i++) {

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,19 +132,31 @@ public List<UUID> nodeIds() {
132132
}
133133

134134
/** */
135-
public FragmentMapping finalize(Supplier<List<UUID>> nodesSource) {
135+
public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSource) {
136136
if (colocationGroups.isEmpty())
137137
return this;
138138

139139
List<ColocationGroup> colocationGroups = this.colocationGroups;
140140

141-
colocationGroups = Commons.transform(colocationGroups, ColocationGroup::finalaze);
141+
colocationGroups = Commons.transform(colocationGroups, ColocationGroup::finalizeMapping);
142142
List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ? nodesSource.get() : nodes;
143143
colocationGroups = Commons.transform(colocationGroups, g -> g.mapToNodes(nodes0));
144144

145145
return new FragmentMapping(colocationGroups);
146146
}
147147

148+
/** */
149+
public FragmentMapping filterByPartitions(int[] parts) throws ColocationMappingException {
150+
List<ColocationGroup> colocationGroups = this.colocationGroups;
151+
152+
if (!F.isEmpty(parts) && colocationGroups.size() > 1)
153+
throw new ColocationMappingException("Execution of non-collocated query with partition parameter is not possible");
154+
155+
colocationGroups = Commons.transform(colocationGroups, g -> g.filterByPartitions(parts));
156+
157+
return new FragmentMapping(colocationGroups);
158+
}
159+
148160
/** */
149161
public @NotNull ColocationGroup findGroup(long sourceId) {
150162
List<ColocationGroup> groups = colocationGroups.stream()

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.query.calcite.prepare;
1919

2020
import java.lang.reflect.Method;
21+
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.Properties;
2324
import com.google.common.collect.Multimap;
@@ -159,14 +160,18 @@ public final class BaseQueryContext extends AbstractQueryContext {
159160
/** */
160161
private final boolean isLocal;
161162

163+
/** */
164+
private final int[] parts;
165+
162166
/**
163167
* Private constructor, used by a builder.
164168
*/
165169
private BaseQueryContext(
166170
FrameworkConfig cfg,
167171
Context parentCtx,
168172
IgniteLogger log,
169-
boolean isLocal
173+
boolean isLocal,
174+
int[] parts
170175
) {
171176
super(Contexts.chain(parentCtx, cfg.getContext()));
172177

@@ -177,6 +182,8 @@ private BaseQueryContext(
177182

178183
this.isLocal = isLocal;
179184

185+
this.parts = parts;
186+
180187
qryCancel = unwrap(GridQueryCancel.class);
181188

182189
typeFactory = TYPE_FACTORY;
@@ -276,6 +283,14 @@ public boolean isLocal() {
276283
return isLocal;
277284
}
278285

286+
/** */
287+
public int[] partitions() {
288+
if (parts != null)
289+
return Arrays.copyOf(parts, parts.length);
290+
291+
return null;
292+
}
293+
279294
/**
280295
* Query context builder.
281296
*/
@@ -299,6 +314,9 @@ public static class Builder {
299314
/** */
300315
private boolean isLocal = false;
301316

317+
/** */
318+
private int[] parts = null;
319+
302320
/**
303321
* @param frameworkCfg Framework config.
304322
* @return Builder for chaining.
@@ -335,13 +353,24 @@ public Builder local(boolean isLocal) {
335353
return this;
336354
}
337355

356+
/**
357+
* @param parts Array of partitions' numbers.
358+
* @return Builder for chaining.
359+
*/
360+
public Builder partitions(int[] parts) {
361+
if (parts != null)
362+
this.parts = Arrays.copyOf(parts, parts.length);
363+
364+
return this;
365+
}
366+
338367
/**
339368
* Builds planner context.
340369
*
341370
* @return Planner context.
342371
*/
343372
public BaseQueryContext build() {
344-
return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal);
373+
return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal, parts);
345374
}
346375
}
347376
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public Fragment attach(RelOptCluster cluster) {
125125
return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
126126
}
127127

128+
/** */
129+
public Fragment filterByPartitions(int[] parts) throws ColocationMappingException {
130+
return new Fragment(id, root, remotes, rootSer, mapping.filterByPartitions(parts));
131+
}
132+
128133
/**
129134
* Mapps the fragment to its data location.
130135
* @param ctx Planner context.
@@ -157,7 +162,7 @@ private FragmentMapping mapping(MappingQueryContext ctx, RelMetadataQuery mq, Su
157162
.get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
158163
}
159164

160-
return mapping.finalize(nodesSource);
165+
return mapping.finalizeMapping(nodesSource);
161166
}
162167
catch (NodeMappingException e) {
163168
throw new FragmentMappingException("Failed to calculate physical distribution", this, e.node(), e);

0 commit comments

Comments
 (0)