17
17
import org .elasticsearch .action .ActionResponse ;
18
18
import org .elasticsearch .action .ActionType ;
19
19
import org .elasticsearch .action .NoShardAvailableActionException ;
20
+ import org .elasticsearch .action .admin .indices .refresh .TransportShardRefreshAction ;
20
21
import org .elasticsearch .action .support .ActionFilters ;
22
+ import org .elasticsearch .action .support .ActiveShardCount ;
23
+ import org .elasticsearch .action .support .replication .BasicReplicationRequest ;
21
24
import org .elasticsearch .action .support .single .shard .TransportSingleShardAction ;
25
+ import org .elasticsearch .client .internal .node .NodeClient ;
22
26
import org .elasticsearch .cluster .ProjectState ;
23
27
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
24
28
import org .elasticsearch .cluster .node .DiscoveryNode ;
48
52
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction <MultiTermVectorsShardRequest , ActionResponse .Empty > {
49
53
50
54
private static final Logger logger = LogManager .getLogger (TransportEnsureDocsSearchableAction .class );
55
+ private final NodeClient client ;
51
56
private final IndicesService indicesService ;
52
57
53
58
private static final String ACTION_NAME = MultiTermVectorsAction .NAME + "/eds" ;
@@ -56,6 +61,7 @@ public class TransportEnsureDocsSearchableAction extends TransportSingleShardAct
56
61
@ Inject
57
62
public TransportEnsureDocsSearchableAction (
58
63
ClusterService clusterService ,
64
+ NodeClient client ,
59
65
TransportService transportService ,
60
66
IndicesService indicesService ,
61
67
ThreadPool threadPool ,
@@ -74,6 +80,7 @@ public TransportEnsureDocsSearchableAction(
74
80
MultiTermVectorsShardRequest ::new ,
75
81
threadPool .executor (ThreadPool .Names .GET )
76
82
);
83
+ this .client = client ;
77
84
this .indicesService = indicesService ;
78
85
}
79
86
@@ -112,7 +119,7 @@ protected void asyncShardOperation(MultiTermVectorsShardRequest request, ShardId
112
119
assert DiscoveryNode .isStateless (clusterService .getSettings ()) : ACTION_NAME + " should only be used in stateless" ;
113
120
assert DiscoveryNode .hasRole (clusterService .getSettings (), DiscoveryNodeRole .INDEX_ROLE )
114
121
: ACTION_NAME + " should only be executed on a stateless indexing node" ;
115
- logger .debug ("received locally {} with {} sub requests" , request , request .locations .size ());
122
+ logger .error ("received locally {} with {} sub requests" , request , request .locations .size ());
116
123
getExecutor (shardId ).execute (() -> ActionListener .run (listener , l -> {
117
124
final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
118
125
final IndexShard indexShard = indexService .getShard (shardId .id ());
@@ -129,24 +136,45 @@ protected void asyncShardOperation(MultiTermVectorsShardRequest request, ShardId
129
136
// We prefer simplicity to complexity (trying to avoid the unnecessary stateless refresh) for these limited cases.
130
137
boolean docInLiveVersionMap = indexShard .withEngine (engine -> engine .isDocumentInLiveVersionMap (docUid ));
131
138
if (docInLiveVersionMap ) {
132
- logger .debug ("doc id [{}] (uid [{}]) found in live version map of index shard [{}]" , docId , docUid , shardId );
139
+ logger .error ("doc id [{}] (uid [{}]) found in live version map of index shard [{}]" , docId , docUid , shardId );
133
140
docsFoundInLiveVersionMap = true ;
134
141
break ;
135
142
}
136
143
}
137
144
}
138
145
if (docsFoundInLiveVersionMap ) {
139
- logger .debug ("refreshing index shard [{}] due to mtv_eds" , shardId );
140
- indexShard .externalRefresh ("refresh_mtv_eds" , l .map (r -> {
141
- logger .debug ("refreshed index shard [{}] due to mtv_eds" , shardId );
142
- return ActionResponse .Empty .INSTANCE ;
146
+ logger .error ("refreshing index shard [{}] due to mtv_eds" , shardId );
147
+ BasicReplicationRequest refreshRequest = new BasicReplicationRequest (shardId );
148
+ refreshRequest .waitForActiveShards (ActiveShardCount .NONE );
149
+ client .executeLocally (TransportShardRefreshAction .TYPE , refreshRequest , l .delegateFailureAndWrap ((ll , r ) -> {
150
+ if (r .getShardInfo ().getFailed () > 0 ) {
151
+ throw r .getShardInfo ().getFailures ()[0 ];
152
+ }
153
+ // super.asyncShardOperation(request, shardId, ll);
154
+ logger .error ("refreshed index shard [{}] due to mtv_eds" , shardId );
155
+ ll .onResponse (ActionResponse .Empty .INSTANCE );
143
156
}));
157
+
158
+ // indexShard.externalRefresh("refresh_mtv_eds", l.map(r -> {
159
+ // logger.error("refreshed index shard [{}] due to mtv_eds", shardId);
160
+ // return ActionResponse.Empty.INSTANCE;
161
+ // }));
162
+ // [2025-06-30T13:21:14,574][ERROR][o.e.a.t.TransportTermVectorsAction] [search-GyrIOXh] received locally id 1 for shard
163
+ // [test_1][2]
164
+ // [2025-06-30T13:21:14,579][ERROR][o.e.a.t.TransportEnsureDocsSearchableAction] [index-KgxlcuR] received locally
165
+ // org.elasticsearch.action.termvectors.MultiTermVectorsShardRequest/unset with 1 sub requests
166
+ // [2025-06-30T13:21:14,581][ERROR][o.e.a.t.TransportEnsureDocsSearchableAction] [index-KgxlcuR] doc id [1] (uid [[fe 1f]])
167
+ // found in live version map of index shard [[test_1][2]]
168
+ // [2025-06-30T13:21:14,582][ERROR][o.e.a.t.TransportEnsureDocsSearchableAction] [index-KgxlcuR] refreshing index shard
169
+ // [[test_1][2]] due to mtv_eds
170
+ // [2025-06-30T13:21:14,668][ERROR][o.e.a.t.TransportEnsureDocsSearchableAction] [index-KgxlcuR] refreshed index shard
171
+ // [[test_1][2]] due to mtv_eds
144
172
} else {
145
173
// Notice that there cannot be a race between the document(s) being evicted from the live version map due to an
146
174
// ongoing refresh and before the search shards being updated with the new commit, because the documents are
147
175
// guaranteed to be the in the live version map archive until search shards are updated with the new commit.
148
176
// Thus, we can safely respond immediately as a no-op.
149
- logger .debug ("mts_eds does not require refresh of index shard [{}]" , shardId );
177
+ logger .error ("mts_eds does not require refresh of index shard [{}]" , shardId );
150
178
l .onResponse (ActionResponse .Empty .INSTANCE );
151
179
}
152
180
}));
0 commit comments