Skip to content

Commit adec3e7

Browse files
alexpaschenkodevozerov
authored andcommitted
IGNITE-5572: SQL: ALTER TABLE ADD COLUMN support. This closes apache#2344.
1 parent ffaf108 commit adec3e7

28 files changed

+2841
-124
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,21 @@ public void dynamicIndexCreate(String schemaName, String tblName, QueryIndexDesc
164164
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
165165
public void dynamicIndexDrop(String schemaName, String idxName, boolean ifExists) throws IgniteCheckedException;
166166

167+
/**
168+
* Add columns to dynamic table.
169+
*
170+
* @param schemaName Schema name.
171+
* @param tblName Table name.
172+
* @param cols Columns to add.
173+
* @param ifTblExists Ignore operation if target table does not exist (instead of throwing an error).
174+
* @param ifColNotExists Ignore operation if column already exists (instead of throwing an error) - is honored only
175+
* for single column case.
176+
* @throws IgniteCheckedException If failed.
177+
*/
178+
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
179+
public void dynamicAddColumn(String schemaName, String tblName, List<QueryField> cols, boolean ifTblExists,
180+
boolean ifColNotExists) throws IgniteCheckedException;
181+
167182
/**
168183
* Registers cache.
169184
*

modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java

Lines changed: 125 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
7070
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
7171
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
72+
import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
7273
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
7374
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
7475
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
@@ -81,6 +82,7 @@
8182
import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
8283
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
8384
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
85+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation;
8486
import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
8587
import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation;
8688
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -754,6 +756,16 @@ else if (op0 instanceof SchemaIndexDropOperation) {
754756

755757
QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc);
756758
}
759+
else if (op0 instanceof SchemaAlterTableAddColumnOperation) {
760+
SchemaAlterTableAddColumnOperation opAddCol =
761+
(SchemaAlterTableAddColumnOperation)op0;
762+
763+
QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opAddCol.tableName());
764+
765+
assert typeDesc != null;
766+
767+
processDynamicAddColumn(typeDesc, opAddCol.columns());
768+
}
757769
else
758770
assert false;
759771
}
@@ -966,6 +978,32 @@ else if (op instanceof SchemaIndexDropOperation) {
966978
else
967979
type = oldIdx.typeDescriptor();
968980
}
981+
else if (op instanceof SchemaAlterTableAddColumnOperation) {
982+
SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op;
983+
984+
type = type(cacheName, op0.tableName());
985+
986+
if (type == null) {
987+
if (op0.ifTableExists())
988+
nop = true;
989+
else
990+
err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND,
991+
op0.tableName());
992+
}
993+
else {
994+
for (QueryField col : op0.columns()) {
995+
if (type.hasField(col.name())) {
996+
if (op0.ifNotExists()) {
997+
assert op0.columns().size() == 1;
998+
999+
nop = true;
1000+
}
1001+
else
1002+
err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_EXISTS, col.name());
1003+
}
1004+
}
1005+
}
1006+
}
9691007
else
9701008
err = new SchemaOperationException("Unsupported operation: " + op);
9711009

@@ -1070,6 +1108,32 @@ else if (op instanceof SchemaIndexDropOperation) {
10701108
err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName);
10711109
}
10721110
}
1111+
else if (op instanceof SchemaAlterTableAddColumnOperation) {
1112+
SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op;
1113+
1114+
QueryEntity e = tblMap.get(op0.tableName());
1115+
1116+
if (e == null) {
1117+
if (op0.ifTableExists())
1118+
nop = true;
1119+
else
1120+
err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND,
1121+
op0.tableName());
1122+
}
1123+
else {
1124+
for (QueryField fld : op0.columns()) {
1125+
if (e.getFields().containsKey(fld.name())) {
1126+
if (op0.ifNotExists()) {
1127+
assert op0.columns().size() == 1;
1128+
1129+
nop = true;
1130+
}
1131+
else
1132+
err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_EXISTS, fld.name());
1133+
}
1134+
}
1135+
}
1136+
}
10731137
else
10741138
err = new SchemaOperationException("Unsupported operation: " + op);
10751139

@@ -1180,9 +1244,7 @@ public void onLocalOperationFinished(SchemaAbstractOperation op, @Nullable Query
11801244

11811245
idxs.put(idxKey, idxDesc);
11821246
}
1183-
else {
1184-
assert op instanceof SchemaIndexDropOperation;
1185-
1247+
else if (op instanceof SchemaIndexDropOperation) {
11861248
SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
11871249

11881250
QueryUtils.processDynamicIndexChange(op0.indexName(), null, type);
@@ -1191,6 +1253,12 @@ public void onLocalOperationFinished(SchemaAbstractOperation op, @Nullable Query
11911253

11921254
idxs.remove(idxKey);
11931255
}
1256+
else {
1257+
assert op instanceof SchemaAlterTableAddColumnOperation;
1258+
1259+
// No-op - all processing is done at "local" stage
1260+
// as we must update both table and type descriptor atomically.
1261+
}
11941262
}
11951263
catch (IgniteCheckedException e) {
11961264
U.warn(log, "Failed to finish index operation [opId=" + op.id() + " op=" + op + ']', e);
@@ -1229,15 +1297,16 @@ public void onNodeLeave(ClusterNode node) {
12291297
}
12301298

12311299
/**
1232-
* Process index operation.
1300+
* Process schema operation.
12331301
*
12341302
* @param op Operation.
12351303
* @param type Type descriptor.
12361304
* @param depId Cache deployment ID.
12371305
* @param cancelTok Cancel token.
12381306
* @throws SchemaOperationException If failed.
12391307
*/
1240-
public void processIndexOperationLocal(SchemaAbstractOperation op, QueryTypeDescriptorImpl type, IgniteUuid depId,
1308+
@SuppressWarnings("StatementWithEmptyBody")
1309+
public void processSchemaOperationLocal(SchemaAbstractOperation op, QueryTypeDescriptorImpl type, IgniteUuid depId,
12411310
SchemaIndexOperationCancellationToken cancelTok) throws SchemaOperationException {
12421311
if (log.isDebugEnabled())
12431312
log.debug("Started local index operation [opId=" + op.id() + ']');
@@ -1251,7 +1320,7 @@ public void processIndexOperationLocal(SchemaAbstractOperation op, QueryTypeDesc
12511320

12521321
try {
12531322
if (op instanceof SchemaIndexCreateOperation) {
1254-
SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) op;
1323+
SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
12551324

12561325
QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index());
12571326

@@ -1261,10 +1330,18 @@ public void processIndexOperationLocal(SchemaAbstractOperation op, QueryTypeDesc
12611330
idx.dynamicIndexCreate(op0.schemaName(), op0.tableName(), idxDesc, op0.ifNotExists(), visitor);
12621331
}
12631332
else if (op instanceof SchemaIndexDropOperation) {
1264-
SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
1333+
SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op;
12651334

12661335
idx.dynamicIndexDrop(op0.schemaName(), op0.indexName(), op0.ifExists());
12671336
}
1337+
else if (op instanceof SchemaAlterTableAddColumnOperation) {
1338+
SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op;
1339+
1340+
processDynamicAddColumn(type, op0.columns());
1341+
1342+
idx.dynamicAddColumn(op0.schemaName(), op0.tableName(), op0.columns(), op0.ifTableExists(),
1343+
op0.ifNotExists());
1344+
}
12681345
else
12691346
throw new SchemaOperationException("Unsupported operation: " + op);
12701347
}
@@ -2062,6 +2139,23 @@ public IgniteInternalFuture<?> dynamicIndexDrop(String cacheName, String schemaN
20622139
return startIndexOperationDistributed(op);
20632140
}
20642141

2142+
/**
2143+
* Entry point for add column procedure.
2144+
* @param schemaName Schema name.
2145+
* @param tblName Target table name.
2146+
* @param cols Columns to add.
2147+
* @param ifTblExists Ignore operation if target table doesn't exist.
2148+
* @param ifNotExists Ignore operation if column exists.
2149+
*/
2150+
public IgniteInternalFuture<?> dynamicColumnAdd(String cacheName, String schemaName, String tblName,
2151+
List<QueryField> cols, boolean ifTblExists, boolean ifNotExists) {
2152+
2153+
SchemaAlterTableAddColumnOperation op = new SchemaAlterTableAddColumnOperation(UUID.randomUUID(), cacheName,
2154+
schemaName, tblName, cols, ifTblExists, ifNotExists);
2155+
2156+
return startIndexOperationDistributed(op);
2157+
}
2158+
20652159
/**
20662160
* Start distributed index change operation.
20672161
*
@@ -2129,6 +2223,30 @@ private void sendQueryExecutedEvent(String sqlQry, Object[] params, String cache
21292223
}
21302224
}
21312225

2226+
/**
2227+
* Update type descriptor with new fields metadata.
2228+
*
2229+
* @param d Type descriptor to update.
2230+
* @param cols Columns to add.
2231+
* @throws IgniteCheckedException If failed to update type descriptor.
2232+
*/
2233+
private void processDynamicAddColumn(QueryTypeDescriptorImpl d, List<QueryField> cols) throws IgniteCheckedException {
2234+
List<GridQueryProperty> props = new ArrayList<>(cols.size());
2235+
2236+
for (QueryField col : cols) {
2237+
try {
2238+
props.add(new QueryBinaryProperty(ctx, col.name(), null, Class.forName(col.typeName()),
2239+
false, null));
2240+
}
2241+
catch (ClassNotFoundException e) {
2242+
throw new SchemaOperationException("Class not found for new property: " + col.typeName());
2243+
}
2244+
}
2245+
2246+
for (GridQueryProperty p : props)
2247+
d.addProperty(p, true);
2248+
}
2249+
21322250
/**
21332251
*
21342252
* @param cacheName Cache name.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.query;
19+
20+
import org.apache.ignite.internal.util.typedef.internal.S;
21+
22+
import java.io.Serializable;
23+
24+
/**
25+
* Query field metadata.
26+
*/
27+
public class QueryField implements Serializable {
28+
/** */
29+
private static final long serialVersionUID = 0L;
30+
31+
/** Field name. */
32+
private final String name;
33+
34+
/** Class name for this field's values. */
35+
private final String typeName;
36+
37+
/**
38+
* @param name Field name.
39+
* @param typeName Class name for this field's values.
40+
*/
41+
public QueryField(String name, String typeName) {
42+
this.name = name;
43+
this.typeName = typeName;
44+
}
45+
46+
/**
47+
* @return Field name.
48+
*/
49+
public String name() {
50+
return name;
51+
}
52+
53+
/**
54+
* @return Class name for this field's values.
55+
*/
56+
public String typeName() {
57+
return typeName;
58+
}
59+
60+
/** {@inheritDoc} */
61+
@Override public String toString() {
62+
return S.toString(QueryField.class, this);
63+
}
64+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@
1717

1818
package org.apache.ignite.internal.processors.query;
1919

20+
import java.io.Serializable;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.LinkedList;
24+
import java.util.List;
2025
import org.apache.ignite.cache.QueryEntity;
2126
import org.apache.ignite.cache.QueryIndex;
2227
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
2328
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
29+
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation;
2430
import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
2531
import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation;
2632
import org.apache.ignite.internal.util.typedef.F;
2733
import org.apache.ignite.internal.util.typedef.internal.S;
2834

29-
import java.io.Serializable;
30-
import java.util.ArrayList;
31-
import java.util.Collection;
32-
import java.util.LinkedList;
33-
import java.util.List;
34-
3535
/**
3636
* Dynamic cache schema.
3737
*/
@@ -118,9 +118,7 @@ public void finish(SchemaFinishDiscoveryMessage msg) {
118118
}
119119
}
120120
}
121-
else {
122-
assert op instanceof SchemaIndexDropOperation;
123-
121+
else if (op instanceof SchemaIndexDropOperation) {
124122
SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op;
125123

126124
for (QueryEntity entity : entities) {
@@ -147,6 +145,27 @@ public void finish(SchemaFinishDiscoveryMessage msg) {
147145
}
148146
}
149147
}
148+
else {
149+
assert op instanceof SchemaAlterTableAddColumnOperation;
150+
151+
SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op;
152+
153+
QueryEntity target = null;
154+
155+
for (QueryEntity entity : entities()) {
156+
if (F.eq(entity.getTableName(), op0.tableName())) {
157+
target = entity;
158+
159+
break;
160+
}
161+
}
162+
163+
if (target == null)
164+
return;
165+
166+
for (QueryField field : op0.columns())
167+
target.getFields().put(field.name(), field.typeName());
168+
}
150169
}
151170
}
152171

0 commit comments

Comments
 (0)