@@ -273,6 +273,42 @@ TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) {
273273 future.timed_get (kFutureTimeout );
274274}
275275
276+ TEST_F (ClusterExchangeTest, MatchesAreEligibleForExchange) {
277+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
278+ setupNShards (2 );
279+ loadRoutingTableWithTwoChunksAndTwoShards (kTestOutNss );
280+
281+ auto mergePipe = unittest::assertGet (Pipeline::create (
282+ {parse (" {$group: {_id: '$x', $doingMerge: true}}" ),
283+ parse (" {$match: {_id: {$gte: 0}}}" ),
284+ DocumentSourceOut::create (kTestOutNss , expCtx (), WriteModeEnum::kModeInsertDocuments )},
285+ expCtx ()));
286+
287+ auto future = launchAsync ([&] {
288+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange (
289+ operationContext (), mergePipe.get ());
290+ ASSERT_TRUE (exchangeSpec);
291+ ASSERT (exchangeSpec->policy == ExchangePolicyEnum::kRange );
292+ ASSERT_TRUE (exchangeSpec->shardDistributionInfo );
293+ const auto & partitions = exchangeSpec->shardDistributionInfo ->partitions ;
294+ ASSERT_EQ (partitions.size (), 2UL ); // One for each shard.
295+
296+ auto shard0Ranges = partitions.find (" 0" );
297+ ASSERT (shard0Ranges != partitions.end ());
298+ ASSERT_EQ (shard0Ranges->second .size (), 1UL );
299+ auto shard0Range = shard0Ranges->second [0 ];
300+ ASSERT (shard0Range == ChunkRange (BSON (" _id" << MINKEY), BSON (" _id" << 0 )));
301+
302+ auto shard1Ranges = partitions.find (" 1" );
303+ ASSERT (shard1Ranges != partitions.end ());
304+ ASSERT_EQ (shard1Ranges->second .size (), 1UL );
305+ auto shard1Range = shard1Ranges->second [0 ];
306+ ASSERT (shard1Range == ChunkRange (BSON (" _id" << 0 ), BSON (" _id" << MAXKEY)));
307+ });
308+
309+ future.timed_get (kFutureTimeout );
310+ }
311+
276312TEST_F (ClusterExchangeTest, SortThenGroupIsEligibleForExchange) {
277313 // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
278314 setupNShards (2 );
0 commit comments