Skip to content

Commit af349bf

Browse files
committed
[server][agent.java] xlog sampling follows commencement service's. If the commencement service is not discarded by sampling rate, the following children services are also not discarded.
1 parent 398abad commit af349bf

File tree

4 files changed

+114
-52
lines changed

4 files changed

+114
-52
lines changed

scouter.common/src/main/java/scouter/lang/pack/XLogProfilePack2.java

+27
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
*/
3030
public class XLogProfilePack2 extends XLogProfilePack implements Pack {
3131

32+
private boolean _forDrop;
33+
private boolean _forProcessDelayingChildren;
34+
3235
public long gxid;
3336
public byte xType;
3437
public byte discardType;
@@ -71,4 +74,28 @@ public Pack read(DataInputX din) throws IOException {
7174
return this;
7275
}
7376

77+
public static XLogProfilePack2 forInternalDropProcessing(XLogPack xLogPack) {
78+
XLogProfilePack2 pack = new XLogProfilePack2();
79+
pack.gxid = xLogPack.gxid;
80+
pack.txid = xLogPack.txid;
81+
pack._forDrop = true;
82+
return pack;
83+
}
84+
85+
public static XLogProfilePack2 forInternalDelayingChildrenProcessing(XLogPack xLogPack) {
86+
XLogProfilePack2 pack = new XLogProfilePack2();
87+
pack.gxid = xLogPack.gxid;
88+
pack.txid = xLogPack.txid;
89+
pack._forProcessDelayingChildren = true;
90+
return pack;
91+
}
92+
93+
public boolean isForDrop() {
94+
return _forDrop;
95+
}
96+
97+
public boolean isForProcessDelayingChildren() {
98+
return _forProcessDelayingChildren;
99+
}
100+
74101
}

scouter.server/src/main/java/scouter/server/core/cache/ProfileDelayingCache.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
import scouter.lang.pack.XLogPack;
2019
import scouter.lang.pack.XLogProfilePack2;
2120
import scouter.server.Configure;
2221
import scouter.server.Logger;
@@ -126,26 +125,26 @@ public void addDelaying(XLogProfilePack2 profilePack) {
126125
profilePacks.add(profilePack);
127126
}
128127

129-
public void removeDelayingChildren(XLogPack drivingXLogPack) {
130-
if (drivingXLogPack.isDriving()) {
131-
gxidProfiles.remove(drivingXLogPack.txid);
128+
public void removeDelayingChildren(XLogProfilePack2 drivingPack) {
129+
if (drivingPack.isDriving()) {
130+
gxidProfiles.remove(drivingPack.txid);
132131
for (SoftReference<LongKeyLinkedMap<List<XLogProfilePack2>>> profilesRef : gxidProfilesOld) {
133132
LongKeyLinkedMap<List<XLogProfilePack2>> profilesInOld = profilesRef.get();
134133
if (profilesInOld == null) {
135134
continue;
136135
}
137-
profilesInOld.remove(drivingXLogPack.txid);
136+
profilesInOld.remove(drivingPack.txid);
138137
}
139138
}
140139
}
141140

142-
public List<XLogProfilePack2> popDelayingChildren(XLogPack drivingXLogPack) {
143-
if (!drivingXLogPack.isDriving()) {
141+
public List<XLogProfilePack2> popDelayingChildren(XLogProfilePack2 drivingPack) {
142+
if (!drivingPack.isDriving()) {
144143
return Collections.emptyList();
145144
}
146-
List<XLogProfilePack2> children = gxidProfiles.get(drivingXLogPack.txid);
145+
List<XLogProfilePack2> children = gxidProfiles.get(drivingPack.txid);
147146
if (children != null) {
148-
gxidProfiles.remove(drivingXLogPack.txid);
147+
gxidProfiles.remove(drivingPack.txid);
149148
} else {
150149
children = new ArrayList<>();
151150
}
@@ -155,9 +154,9 @@ public List<XLogProfilePack2> popDelayingChildren(XLogPack drivingXLogPack) {
155154
if (profilesInOld == null) {
156155
continue;
157156
}
158-
List<XLogProfilePack2> childrenInOld = profilesInOld.get(drivingXLogPack.txid);
157+
List<XLogProfilePack2> childrenInOld = profilesInOld.get(drivingPack.txid);
159158
if (childrenInOld != null) {
160-
profilesInOld.remove(drivingXLogPack.txid);
159+
profilesInOld.remove(drivingPack.txid);
161160
children.addAll(childrenInOld);
162161
}
163162
}

scouter.server/src/main/scala/scouter/server/core/ProfilePreCore.scala

+56-14
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,67 @@ object ProfilePreCore {
4141
}) {
4242
val pack = queue.get();
4343
ServerStat.put("profile.core0.queue", queue.size());
44-
if (BytesUtil.getLength(pack.profile) > 0) {
44+
45+
val pack2 = pack match {
46+
case _pack: XLogProfilePack2 => _pack
47+
case _ => null
48+
};
49+
50+
//Drop profile Pack by txid
51+
if (pack2 != null && pack2.isForDrop) {
52+
ProfileDelayingCache.instance.removeDelayingChildren(pack2);
53+
54+
//Process delaying profile Pack by txid
55+
} else if (pack2 != null && pack2.isForProcessDelayingChildren) {
56+
processDelayingProfiles(pack2)
57+
58+
} else if (BytesUtil.getLength(pack.profile) > 0) {
4559
if (canProcess(pack)) {
4660
processOnCondition(pack);
4761
} else {
62+
//it must be XLogProfilePack2 type. (by canProcess() results false)
4863
waitOnMemory(pack.asInstanceOf[XLogProfilePack2]);
4964
}
5065
}
5166
}
5267

53-
def canProcess(pack: XLogProfilePack): Boolean = {
68+
def add(p: XLogProfilePack) {
69+
p.time = System.currentTimeMillis();
70+
71+
val ok = queue.put(p)
72+
if (!ok) {
73+
Logger.println("S110-0", 10, "queue exceeded!!");
74+
}
75+
}
76+
77+
def addAsDropped(p: XLogPack): Unit = {
78+
val profilePack = XLogProfilePack2.forInternalDropProcessing(p);
79+
val ok = queue.put(profilePack)
80+
if (!ok) {
81+
Logger.println("S110-0-1", 10, "queue exceeded!!");
82+
}
83+
}
84+
85+
def addAsProcessDelayingChildren(p: XLogPack): Unit = {
86+
val profilePack = XLogProfilePack2.forInternalDelayingChildrenProcessing(p);
87+
val ok = queue.put(profilePack)
88+
if (!ok) {
89+
Logger.println("S110-0-2", 10, "queue exceeded!!");
90+
}
91+
}
92+
93+
private def processDelayingProfiles(pack2: XLogProfilePack2) = {
94+
val profileList = ProfileDelayingCache.instance.popDelayingChildren(pack2);
95+
profileList.forEach(new Consumer[XLogProfilePack] {
96+
override def accept(delayingPack: XLogProfilePack): Unit = {
97+
if (pack2.discardType != XLogDiscardTypes.DISCARD_ALL) {
98+
ProfileCore.add(delayingPack)
99+
}
100+
}
101+
});
102+
}
103+
104+
private def canProcess(pack: XLogProfilePack): Boolean = {
54105
if (pack.getPackType() == PackEnum.XLOG_PROFILE2) {
55106
val pack2 = pack.asInstanceOf[XLogProfilePack2];
56107
return (pack2.isDriving()
@@ -63,7 +114,7 @@ object ProfilePreCore {
63114
return true;
64115
}
65116

66-
def processOnCondition(pack: XLogProfilePack): Unit = {
117+
private def processOnCondition(pack: XLogProfilePack): Unit = {
67118
if (pack.getPackType() == PackEnum.XLOG_PROFILE2) {
68119
val pack2 = pack.asInstanceOf[XLogProfilePack2];
69120
if (pack2.ignoreGlobalConsequentSampling) {
@@ -78,18 +129,9 @@ object ProfilePreCore {
78129
}
79130
}
80131

81-
def waitOnMemory(pack2: XLogProfilePack2): Unit = {
132+
private def waitOnMemory(pack2: XLogProfilePack2): Unit = {
82133
ProfileDelayingCache.instance.addDelaying(pack2);
83134
}
84135

85-
def doNothing() {}
86-
87-
def add(p: XLogProfilePack) {
88-
p.time = System.currentTimeMillis();
89-
90-
val ok = queue.put(p)
91-
if (!ok) {
92-
Logger.println("S110-0", 10, "queue exceeded!!");
93-
}
94-
}
136+
private def doNothing() {}
95137
}

scouter.server/src/main/scala/scouter/server/core/XLogPreCore.scala

+21-27
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ object XLogPreCore {
3737
val pack = queue.get();
3838
ServerStat.put("xlog.core0.queue", queue.size());
3939
if (Configure.WORKABLE) {
40-
if (pack.isDropped()) {
40+
if (pack.isDropped) {
4141
//discard dropped in delaying
4242
XLogDelayingCache.instance.removeDelayingChildren(pack);
43-
ProfileDelayingCache.instance.removeDelayingChildren(pack);
43+
ProfilePreCore.addAsDropped(pack);
4444

4545
} else {
4646
if (canProcess(pack)) {
@@ -52,7 +52,18 @@ object XLogPreCore {
5252
}
5353
}
5454

55-
def processOnCondition(pack: XLogPack): Unit = {
55+
def add(pack: XLogPack) {
56+
if (pack.endTime == 0) {
57+
pack.endTime = System.currentTimeMillis();
58+
}
59+
60+
val ok = queue.put(pack);
61+
if (!ok) {
62+
Logger.println("S116-0", 10, "queue exceeded!!");
63+
}
64+
}
65+
66+
private def processOnCondition(pack: XLogPack): Unit = {
5667
if (pack.ignoreGlobalConsequentSampling) {
5768
if (XLogDiscardTypes.isAliveXLog(pack.discardType)) {
5869
process0(pack);
@@ -62,13 +73,13 @@ object XLogPreCore {
6273
}
6374
}
6475

65-
def process0(pack: XLogPack) {
76+
private def process0(pack: XLogPack) {
6677
XLogDelayingCache.instance.addProcessed(pack);
6778
processDelayingChildren(pack);
6879
XLogCore.add(pack);
6980
}
7081

71-
def canProcess(pack: XLogPack): Boolean = {
82+
private def canProcess(pack: XLogPack): Boolean = {
7283
if (pack.isDriving()
7384
|| pack.ignoreGlobalConsequentSampling
7485
|| pack.discardType == 0
@@ -83,16 +94,11 @@ object XLogPreCore {
8394
}
8495
}
8596

86-
def processDelayingChildren(pack: XLogPack): Unit = {
87-
val profileList = ProfileDelayingCache.instance.popDelayingChildren(pack);
88-
profileList.forEach(new Consumer[XLogProfilePack] {
89-
override def accept(delayingPack: XLogProfilePack): Unit = {
90-
if (pack.discardType != XLogDiscardTypes.DISCARD_ALL) {
91-
ProfileCore.add(delayingPack)
92-
}
93-
}
94-
});
97+
private def processDelayingChildren(pack: XLogPack): Unit = {
98+
//for profile
99+
ProfilePreCore.addAsProcessDelayingChildren(pack);
95100

101+
//for xlog
96102
val xLogList = XLogDelayingCache.instance.popDelayingChildren(pack);
97103
xLogList.forEach(new Consumer[XLogPack] {
98104
override def accept(delayingPack: XLogPack): Unit = {
@@ -103,20 +109,8 @@ object XLogPreCore {
103109
});
104110
}
105111

106-
def waitOnMemory(pack: XLogPack): Unit = {
112+
private def waitOnMemory(pack: XLogPack): Unit = {
107113
XLogDelayingCache.instance.addDelaying(pack);
108114
}
109115

110-
111-
def add(pack: XLogPack) {
112-
if (pack.endTime == 0) {
113-
pack.endTime = System.currentTimeMillis();
114-
}
115-
116-
val ok = queue.put(pack);
117-
if (!ok) {
118-
Logger.println("S116-0", 10, "queue exceeded!!");
119-
}
120-
}
121-
122116
}

0 commit comments

Comments
 (0)