Skip to content

Commit 62c560a

Browse files
daradurvsnizhikov
authored andcommitted
IGNITE-9607: Service Grid redesign - Phase 1 - Fixes apache#4434.
Signed-off-by: Nikolay Izhikov <[email protected]>
1 parent 457090a commit 62c560a

File tree

83 files changed

+6919
-450
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+6919
-450
lines changed

modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,19 @@ public final class IgniteSystemProperties {
10581058
*/
10591059
public static final String IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE = "IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE";
10601060

1061+
/**
1062+
* Manages the type of the implementation of the service processor (implementation of the {@link IgniteServices}).
1063+
* All nodes in the cluster must have the same value of this property.
1064+
* <p/>
1065+
* If the property is {@code true} then event-driven implementation of the service processor will be used.
1066+
* <p/>
1067+
* If the property is {@code false} then internal cache based implementation of service processor will be used.
1068+
* <p/>
1069+
* Default is {@code true}.
1070+
*/
1071+
public static final String IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED
1072+
= "IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED";
1073+
10611074
/**
10621075
* When set to {@code true}, cache metrics are not included into the discovery metrics update message (in this
10631076
* case message contains only cluster metrics). By default cache metrics are included into the message and

modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ enum DiscoveryDataExchangeType {
7070
CACHE_CRD_PROC,
7171

7272
/** Encryption manager. */
73-
ENCRYPTION_MGR
73+
ENCRYPTION_MGR,
74+
75+
/** Service processor. */
76+
SERVICE_PROC
7477
}
7578

7679
/**

modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
3737
import org.apache.ignite.internal.stat.IoStatisticsManager;
3838
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
39+
import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
3940
import org.apache.ignite.internal.worker.WorkersRegistry;
4041
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
4142
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -67,7 +68,6 @@
6768
import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
6869
import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
6970
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
70-
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
7171
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
7272
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
7373
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -224,7 +224,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
224224
*
225225
* @return Service processor.
226226
*/
227-
public GridServiceProcessor service();
227+
public ServiceProcessorAdapter service();
228228

229229
/**
230230
* Gets port processor.

modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
4949
import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
5050
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
51+
import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
5152
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
5253
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
5354
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
@@ -84,7 +85,6 @@
8485
import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
8586
import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
8687
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
87-
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
8888
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
8989
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
9090
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -208,7 +208,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
208208

209209
/** */
210210
@GridToStringInclude
211-
private GridServiceProcessor svcProc;
211+
private ServiceProcessorAdapter srvcProc;
212212

213213
/** */
214214
@GridToStringInclude
@@ -608,8 +608,8 @@ else if (comp instanceof GridPortProcessor)
608608
portProc = (GridPortProcessor)comp;
609609
else if (comp instanceof GridClosureProcessor)
610610
closProc = (GridClosureProcessor)comp;
611-
else if (comp instanceof GridServiceProcessor)
612-
svcProc = (GridServiceProcessor)comp;
611+
else if (comp instanceof ServiceProcessorAdapter)
612+
srvcProc = (ServiceProcessorAdapter)comp;
613613
else if (comp instanceof IgniteScheduleProcessorAdapter)
614614
scheduleProc = (IgniteScheduleProcessorAdapter)comp;
615615
else if (comp instanceof GridSegmentationProcessor)
@@ -762,8 +762,8 @@ else if (helper instanceof HadoopHelper)
762762
}
763763

764764
/** {@inheritDoc} */
765-
@Override public GridServiceProcessor service() {
766-
return svcProc;
765+
@Override public ServiceProcessorAdapter service() {
766+
return srvcProc;
767767
}
768768

769769
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,10 @@ public enum GridTopic {
136136
TOPIC_CACHE_COORDINATOR,
137137

138138
/** */
139-
TOPIC_GEN_ENC_KEY;
139+
TOPIC_GEN_ENC_KEY,
140+
141+
/** */
142+
TOPIC_SERVICES;
140143

141144
/** Enum values. */
142145
private static final GridTopic[] VALS = values();

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
117117
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
118118
import org.apache.ignite.internal.processors.GridProcessor;
119+
import org.apache.ignite.internal.processors.GridProcessorAdapter;
119120
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
120121
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
121122
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -163,6 +164,7 @@
163164
import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
164165
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
165166
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
167+
import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
166168
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
167169
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
168170
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -224,6 +226,7 @@
224226
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
225227
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
226228
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
229+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
227230
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
228231
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
229232
import static org.apache.ignite.IgniteSystemProperties.IGNITE_REST_START_ON_CLIENT;
@@ -251,6 +254,7 @@
251254
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
252255
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
253256
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
257+
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
254258
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
255259
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
256260
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -1016,7 +1020,7 @@ public void start(
10161020
startProcessor(new GridCacheProcessor(ctx));
10171021
startProcessor(new GridQueryProcessor(ctx));
10181022
startProcessor(new ClientListenerProcessor(ctx));
1019-
startProcessor(new GridServiceProcessor(ctx));
1023+
startProcessor(createServiceProcessor());
10201024
startProcessor(new GridTaskSessionProcessor(ctx));
10211025
startProcessor(new GridJobProcessor(ctx));
10221026
startProcessor(new GridTaskProcessor(ctx));
@@ -1356,6 +1360,20 @@ private HadoopProcessorAdapter createHadoopComponent() throws IgniteCheckedExcep
13561360
}
13571361
}
13581362

1363+
/**
1364+
* Creates service processor depend on {@link IgniteSystemProperties#IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED}.
1365+
*
1366+
* @return Service processor.
1367+
*/
1368+
private GridProcessorAdapter createServiceProcessor() {
1369+
final boolean srvcProcMode = getBoolean(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true);
1370+
1371+
if (srvcProcMode)
1372+
return new IgniteServiceProcessor(ctx);
1373+
1374+
return new GridServiceProcessor(ctx);
1375+
}
1376+
13591377
/**
13601378
* Validates common configuration parameters.
13611379
*
@@ -1643,6 +1661,9 @@ private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedExcep
16431661
ctx.addNodeAttribute(e.getKey(), e.getValue());
16441662
}
16451663
}
1664+
1665+
ctx.addNodeAttribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED,
1666+
ctx.service() instanceof IgniteServiceProcessor);
16461667
}
16471668

16481669
/**

modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ public final class IgniteNodeAttributes {
208208
/** Supported features. */
209209
public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features";
210210

211+
/** Ignite services processor mode. */
212+
public static final String ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED = ATTR_PREFIX +
213+
".event.driven.service.processor.enabled";
214+
211215
/**
212216
* Enforces singleton.
213217
*/

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@
184184
import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
185185
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
186186
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
187+
import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult;
188+
import org.apache.ignite.internal.processors.service.ServiceDeploymentProcessId;
189+
import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch;
187190
import org.apache.ignite.internal.util.GridByteArrayList;
188191
import org.apache.ignite.internal.util.GridIntList;
189192
import org.apache.ignite.internal.util.GridLongList;
@@ -1120,6 +1123,21 @@ public GridIoMessageFactory(MessageFactory[] ext) {
11201123

11211124
break;
11221125

1126+
case 167:
1127+
msg = new ServiceDeploymentProcessId();
1128+
1129+
break;
1130+
1131+
case 168:
1132+
msg = new ServiceSingleNodeDeploymentResultBatch();
1133+
1134+
break;
1135+
1136+
case 169:
1137+
msg = new ServiceSingleNodeDeploymentResult();
1138+
1139+
break;
1140+
11231141
// [-3..119] [124..129] [-23..-28] [-36..-55] - this
11241142
// [120..123] - DR
11251143
// [-4..-22, -30..-35] - SQL

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
import static java.util.concurrent.TimeUnit.MILLISECONDS;
144144
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
145145
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
146+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
146147
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
147148
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
148149
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -157,6 +158,7 @@
157158
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
158159
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
159160
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
161+
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
160162
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
161163
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
162164
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
@@ -789,6 +791,8 @@ else if (customMsg instanceof ChangeGlobalStateMessage) {
789791

790792
ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
791793

794+
ctx.service().onLocalJoin(discoEvt, discoCache);
795+
792796
ctx.authentication().onLocalJoin();
793797

794798
ctx.encryption().onLocalJoin();
@@ -847,6 +851,8 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
847851

848852
ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
849853

854+
ctx.service().onLocalJoin(localJoinEvent(), discoCache);
855+
850856
ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
851857
@Override public void apply(IgniteFuture<?> fut) {
852858
try {
@@ -1223,6 +1229,7 @@ private void checkAttributes(Iterable<ClusterNode> nodes) throws IgniteCheckedEx
12231229

12241230
boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
12251231

1232+
Boolean locSrvcProcMode = locNode.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
12261233
Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
12271234

12281235
for (ClusterNode n : nodes) {
@@ -1308,6 +1315,22 @@ private void checkAttributes(Iterable<ClusterNode> nodes) throws IgniteCheckedEx
13081315
", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]");
13091316
}
13101317

1318+
Boolean rmtSrvcProcModeAttr = n.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
1319+
1320+
final boolean rmtSrvcProcMode = rmtSrvcProcModeAttr != null ? rmtSrvcProcModeAttr : false;
1321+
1322+
if (!F.eq(locSrvcProcMode, rmtSrvcProcMode)) {
1323+
throw new IgniteCheckedException("Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
1324+
" property value differs from remote node's value " +
1325+
"(to make sure all nodes in topology have identical service processor mode, " +
1326+
"configure system property explicitly) " +
1327+
"[locSrvcProcMode=" + locSrvcProcMode +
1328+
", rmtSrvcProcMode=" + rmtSrvcProcMode +
1329+
", locNodeAddrs=" + U.addressesAsString(locNode) +
1330+
", rmtNodeAddrs=" + U.addressesAsString(n) +
1331+
", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]");
1332+
}
1333+
13111334
if (n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0
13121335
&& ctx.security().enabled() // Matters only if security enabled.
13131336
) {

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2424
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
2525
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
26+
import org.apache.ignite.internal.processors.service.ServiceDeploymentActions;
27+
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
2628
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2729
import org.apache.ignite.internal.util.typedef.F;
2830
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -52,6 +54,10 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
5254
/** Restarting caches. */
5355
private Set<String> restartingCaches;
5456

57+
/** Affinity (cache related) services updates to be processed on services deployment process. */
58+
@GridToStringExclude
59+
@Nullable private transient ServiceDeploymentActions serviceDeploymentActions;
60+
5561
/**
5662
* @param reqs Requests.
5763
*/
@@ -117,6 +123,20 @@ void exchangeActions(ExchangeActions exchangeActions) {
117123
this.exchangeActions = exchangeActions;
118124
}
119125

126+
/**
127+
* @return Services deployment actions to be processed on services deployment process.
128+
*/
129+
@Nullable public ServiceDeploymentActions servicesDeploymentActions() {
130+
return serviceDeploymentActions;
131+
}
132+
133+
/**
134+
* @param serviceDeploymentActions Services deployment actions to be processed on services deployment process.
135+
*/
136+
public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) {
137+
this.serviceDeploymentActions = serviceDeploymentActions;
138+
}
139+
120140
/**
121141
* @return {@code True} if required to start all caches on client node.
122142
*/

0 commit comments

Comments
 (0)