Skip to content

Commit 3485b92

Browse files
authored
IGNITE-12808 Allow register started caches in indexing to enable SQL query on them. (apache#7627)
This patch introduce support of the `CREATE TABLE T1 ... WITH 'cache_name=some-cache` where "some-cache" is the existing cache name.
1 parent 1e84d44 commit 3485b92

28 files changed

+2145
-208
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public CacheMetricsImpl(GridCacheContext<?, ?> cctx, boolean isNear) {
351351
"Number of partitions need to be cleared before actual rebalance start.");
352352

353353
mreg.register("IsIndexRebuildInProgress", () -> {
354-
IgniteInternalFuture fut = cctx.shared().database().indexRebuildFuture(cctx.cacheId());
354+
IgniteInternalFuture fut = cctx.shared().kernalContext().query().indexRebuildFuture(cctx.cacheId());
355355

356356
return fut != null && !fut.isDone();
357357
}, "True if index rebuild is in progress.");

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2017,7 +2017,7 @@ private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID node
20172017
else if (!active && isMergeConfigSupport) {
20182018
DynamicCacheDescriptor desc = registeredCaches.get(cfg.getName());
20192019

2020-
QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
2020+
QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData());
20212021

20222022
if (schemaPatch.hasConflicts()) {
20232023
hasSchemaPatchConflict = true;

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.ignite.internal.processors.query.QuerySchema;
3030
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
3131
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
32+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
33+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
3234
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
3335
import org.apache.ignite.internal.util.typedef.T2;
3436
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -363,6 +365,24 @@ public void schema(QuerySchema schema) {
363365
public void schemaChangeFinish(SchemaFinishDiscoveryMessage msg) {
364366
synchronized (schemaMux) {
365367
schema.finish(msg);
368+
369+
if (msg.operation() instanceof SchemaAddQueryEntityOperation) {
370+
cacheCfg = GridCacheUtils.patchCacheConfiguration(cacheCfg,
371+
(SchemaAddQueryEntityOperation)msg.operation());
372+
}
373+
}
374+
}
375+
376+
/**
377+
* Make schema patch for this cache.
378+
*
379+
* @param cacheData Stored cache by which current schema should be expanded.
380+
* @return Patch which contains operations for expanding schema of this cache.
381+
* @see QuerySchemaPatch
382+
*/
383+
public QuerySchemaPatch makeSchemaPatch(StoredCacheData cacheData) {
384+
synchronized (schemaMux) {
385+
return schema.makePatch(cacheData.config(), cacheData.queryEntities());
366386
}
367387
}
368388

@@ -387,7 +407,16 @@ public QuerySchemaPatch makeSchemaPatch(Collection<QueryEntity> target) {
387407
*/
388408
public boolean applySchemaPatch(QuerySchemaPatch patch) {
389409
synchronized (schemaMux) {
390-
return schema.applyPatch(patch);
410+
boolean res = schema.applyPatch(patch);
411+
412+
if (res) {
413+
for (SchemaAbstractOperation op: patch.getPatchOperations()) {
414+
if (op instanceof SchemaAddQueryEntityOperation)
415+
cacheCfg = GridCacheUtils.patchCacheConfiguration(cacheCfg, (SchemaAddQueryEntityOperation)op);
416+
}
417+
}
418+
419+
return res;
391420
}
392421
}
393422

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.ignite.binary.BinaryObjectBuilder;
4949
import org.apache.ignite.cache.CacheInterceptor;
5050
import org.apache.ignite.cache.CacheMode;
51+
import org.apache.ignite.cache.QueryEntity;
5152
import org.apache.ignite.cluster.ClusterNode;
5253
import org.apache.ignite.configuration.CacheConfiguration;
5354
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -95,6 +96,7 @@
9596
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
9697
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager;
9798
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
99+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
98100
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
99101
import org.apache.ignite.internal.util.F0;
100102
import org.apache.ignite.internal.util.lang.GridFunc;
@@ -157,7 +159,7 @@ public class GridCacheContext<K, V> implements Externalizable {
157159
private IgniteLogger log;
158160

159161
/** Cache configuration. */
160-
private CacheConfiguration cacheCfg;
162+
private volatile CacheConfiguration cacheCfg;
161163

162164
/** Affinity manager. */
163165
private GridCacheAffinityManager affMgr;
@@ -2350,6 +2352,39 @@ public boolean hasContinuousQueryListeners(@Nullable IgniteInternalTx tx) {
23502352
contQryMgr.notifyContinuousQueries(tx) && !F.isEmpty(contQryMgr.updateListeners(false, false));
23512353
}
23522354

2355+
/**
2356+
* Apply changes from {@link SchemaAddQueryEntityOperation}.
2357+
*
2358+
* @param op Add query entity schema operation.
2359+
*/
2360+
public void onSchemaAddQueryEntity(SchemaAddQueryEntityOperation op) {
2361+
onSchemaAddQueryEntity(op.entities(), op.schemaName(), op.isSqlEscape(),
2362+
op.queryParallelism());
2363+
}
2364+
2365+
/**
2366+
* Apply changes on enable indexing.
2367+
*
2368+
* @param entities New query entities.
2369+
* @param sqlSchema Sql schema name.
2370+
* @param isSqlEscape Sql escape flag.
2371+
* @param qryParallelism Query parallelism parameter.
2372+
*/
2373+
public void onSchemaAddQueryEntity(
2374+
Collection<QueryEntity> entities,
2375+
String sqlSchema,
2376+
boolean isSqlEscape,
2377+
int qryParallelism
2378+
) {
2379+
CacheConfiguration oldCfg = cacheCfg;
2380+
2381+
if (oldCfg != null)
2382+
cacheCfg = GridCacheUtils.patchCacheConfiguration(oldCfg, entities, sqlSchema, isSqlEscape, qryParallelism);
2383+
2384+
if (qryMgr != null)
2385+
qryMgr.enable();
2386+
}
2387+
23532388
/**
23542389
* Returns future that assigned to last performing {@link GlobalRemoveAllJob}.
23552390
*/

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.cache;
1919

2020
import org.apache.ignite.configuration.CacheConfiguration;
21+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
2122
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
2223
import org.apache.ignite.internal.util.typedef.internal.CU;
2324
import org.apache.ignite.lang.IgniteUuid;
@@ -35,7 +36,7 @@ public class GridCacheContextInfo<K, V> {
3536
private final IgniteUuid dynamicDeploymentId;
3637

3738
/** Cache configuration. */
38-
private final CacheConfiguration config;
39+
private volatile CacheConfiguration<K, V> config;
3940

4041
/** Cache group ID. */
4142
private final int groupId;
@@ -44,7 +45,7 @@ public class GridCacheContextInfo<K, V> {
4445
private final int cacheId;
4546

4647
/** Full cache context. Can be {@code null} in case a cache is not started. */
47-
@Nullable private volatile GridCacheContext cctx;
48+
@Nullable private volatile GridCacheContext<K, V> cctx;
4849

4950
/**
5051
* Constructor of full cache context.
@@ -80,7 +81,7 @@ public GridCacheContextInfo(DynamicCacheDescriptor cacheDesc) {
8081
/**
8182
* @return Cache configuration.
8283
*/
83-
public CacheConfiguration config() {
84+
public CacheConfiguration<K, V> config() {
8485
return config;
8586
}
8687

@@ -115,15 +116,15 @@ public boolean affinityNode() {
115116
/**
116117
* @return Cache context. {@code null} for not started cache.
117118
*/
118-
@Nullable public GridCacheContext cacheContext() {
119+
@Nullable public GridCacheContext<K, V> cacheContext() {
119120
return cctx;
120121
}
121122

122123
/**
123124
* @return Dynamic deployment ID.
124125
*/
125126
public IgniteUuid dynamicDeploymentId() {
126-
GridCacheContext cctx0 = cctx;
127+
GridCacheContext<K, V> cctx0 = cctx;
127128

128129
if (cctx0 != null)
129130
return cctx0.dynamicDeploymentId();
@@ -138,7 +139,7 @@ public IgniteUuid dynamicDeploymentId() {
138139
*
139140
* @param cctx Initted cache context.
140141
*/
141-
public void initCacheContext(GridCacheContext<?, ?> cctx) {
142+
public void initCacheContext(GridCacheContext<K, V> cctx) {
142143
assert this.cctx == null : this.cctx;
143144
assert cctx != null;
144145

@@ -167,6 +168,24 @@ public boolean isCacheContextInited() {
167168
return cctx != null;
168169
}
169170

171+
/**
172+
* Apply changes from {@link SchemaAddQueryEntityOperation}.
173+
*
174+
* @param op Add query entity schema operation.
175+
*/
176+
public void onSchemaAddQueryEntity(SchemaAddQueryEntityOperation op) {
177+
if (cctx != null) {
178+
cctx.onSchemaAddQueryEntity(op);
179+
180+
config = cctx.config();
181+
}
182+
else {
183+
CacheConfiguration<K, V> oldCfg = config;
184+
185+
config = GridCacheUtils.patchCacheConfiguration(oldCfg, op);
186+
}
187+
}
188+
170189
/** {@inheritDoc} */
171190
@Override public String toString() {
172191
return "GridCacheContextInfo: " + name() + " " + (isCacheContextInited() ? "started" : "not started");

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -924,15 +924,23 @@ private void stopCacheOnReconnect(GridCacheContext cctx, List<GridCacheAdapter>
924924
reconnected.add(cache);
925925

926926
if (cache.context().userCache()) {
927-
// Re-create cache structures inside indexing in order to apply recent schema changes.
928-
GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false);
927+
DynamicCacheDescriptor desc = cacheDescriptor(cache.name());
929928

930-
DynamicCacheDescriptor desc = cacheDescriptor(cacheInfo.name());
929+
assert desc != null : cache.name();
931930

932-
assert desc != null : cacheInfo.name();
931+
if (!QueryUtils.isEnabled(cache.context().config())
932+
&& QueryUtils.isEnabled(desc.cacheConfiguration())) {
933+
CacheConfiguration newCfg = desc.cacheConfiguration();
934+
935+
cache.context().onSchemaAddQueryEntity(newCfg.getQueryEntities(), newCfg.getSqlSchema(),
936+
newCfg.isSqlEscapeAll(), newCfg.getQueryParallelism());
937+
}
933938

934939
boolean rmvIdx = !cache.context().group().persistenceEnabled();
935940

941+
// Re-create cache structures inside indexing in order to apply recent schema changes.
942+
GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false);
943+
936944
ctx.query().onCacheStop0(cacheInfo, rmvIdx);
937945
ctx.query().onCacheStart0(cacheInfo, desc.schema(), desc.sql());
938946
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
8484
import org.apache.ignite.internal.processors.query.QueryUtils;
8585
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
86+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
8687
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
8788
import org.apache.ignite.internal.util.lang.GridClosureException;
8889
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
@@ -2121,6 +2122,43 @@ public static boolean isCacheTemplateName(String cacheName) {
21212122
return cacheName.endsWith("*");
21222123
}
21232124

2125+
/**
2126+
* Patch cache configuration with {@link SchemaAddQueryEntityOperation}.
2127+
*
2128+
* @param oldCfg Old cache config.
2129+
* @param op Schema add query entity operation.
2130+
*/
2131+
public static <K, V> CacheConfiguration<K, V> patchCacheConfiguration(
2132+
CacheConfiguration<K, V> oldCfg,
2133+
SchemaAddQueryEntityOperation op
2134+
) {
2135+
return patchCacheConfiguration(oldCfg, op.entities(), op.schemaName(), op.isSqlEscape(),
2136+
op.queryParallelism());
2137+
}
2138+
2139+
/**
2140+
* Patch cache configuration with {@link SchemaAddQueryEntityOperation}.
2141+
*
2142+
* @param oldCfg Old cache config.
2143+
* @param entities New query entities.
2144+
* @param sqlSchema Sql schema name.
2145+
* @param isSqlEscape Sql escape flag.
2146+
* @param qryParallelism Query parallelism parameter.
2147+
*/
2148+
public static <K, V> CacheConfiguration<K, V> patchCacheConfiguration(
2149+
CacheConfiguration<K, V> oldCfg,
2150+
Collection<QueryEntity> entities,
2151+
String sqlSchema,
2152+
boolean isSqlEscape,
2153+
int qryParallelism
2154+
) {
2155+
return new CacheConfiguration<>(oldCfg)
2156+
.setQueryEntities(entities)
2157+
.setSqlSchema(sqlSchema)
2158+
.setSqlEscapeAll(isSqlEscape)
2159+
.setQueryParallelism(qryParallelism);
2160+
}
2161+
21242162
/**
21252163
*
21262164
*/

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2223,7 +2223,7 @@ private <R> void setFuture(IgniteInternalFuture<R> fut) {
22232223
@Override public IgniteFuture<?> indexReadyFuture() {
22242224
GridCacheContext<K, V> ctx = getContextSafe();
22252225

2226-
IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId());
2226+
IgniteInternalFuture fut = ctx.shared().kernalContext().query().indexRebuildFuture(ctx.cacheId());
22272227

22282228
if (fut == null)
22292229
return new IgniteFinishedFutureImpl<>();

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,16 @@
9292
* Util class for joining node validation.
9393
*/
9494
public class ValidationOnNodeJoinUtils {
95-
/** Template of message of conflicts during configuration merge */
95+
/** Template of message of conflicts of sql schema name. */
96+
private static final String SQL_SCHEMA_CONFLICTS_MESSAGE =
97+
"Failed to join node to the active cluster, configuration conflict for cache '%s': " +
98+
"schema '%s' from joining node differs to '%s'";
99+
100+
/** Template of message of conflicts during configuration merge. */
96101
private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
97102
"Conflicts during configuration merge for cache '%s' : \n%s";
98103

99-
/** Template of message of node join was fail because it requires to merge of config */
104+
/** Template of message of node join was fail because it requires to merge of config. */
100105
private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
101106
"(the config of the cache '%s' has to be merged which is impossible on active grid). " +
102107
"Deactivate grid and retry node join or clean the joining node.";
@@ -173,15 +178,29 @@ public class ValidationOnNodeJoinUtils {
173178
if (locDesc == null)
174179
continue;
175180

176-
QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
181+
String joinedSchema = cacheInfo.cacheData().config().getSqlSchema();
182+
Collection<QueryEntity> joinedQryEntities = cacheInfo.cacheData().queryEntities();
183+
String locSchema = locDesc.cacheConfiguration().getSqlSchema();
184+
185+
// Peform checks of SQL schema. If schemas' names not equal, only valid case is if local or joined
186+
// QuerySchema is empty and schema name is null (when indexing enabled dynamically).
187+
if (!F.eq(joinedSchema, locSchema)
188+
&& (locSchema != null || !locDesc.schema().isEmpty())
189+
&& (joinedSchema != null || !F.isEmpty(joinedQryEntities))) {
190+
errorMsg.append(String.format(SQL_SCHEMA_CONFLICTS_MESSAGE, locDesc.cacheName(), joinedSchema,
191+
locSchema));
192+
}
193+
194+
QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(joinedQryEntities);
177195

178196
if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
179197
if (errorMsg.length() > 0)
180198
errorMsg.append("\n");
181199

182-
if (schemaPatch.hasConflicts())
183-
errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
184-
locDesc.cacheName(), schemaPatch.getConflictsMessage()));
200+
if (schemaPatch.hasConflicts()) {
201+
errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE, locDesc.cacheName(),
202+
schemaPatch.getConflictsMessage()));
203+
}
185204
else
186205
errorMsg.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, locDesc.cacheName()));
187206
}

0 commit comments

Comments
 (0)