Skip to content

Commit 0f93772

Browse files
authored
fix: jfr ingestion issue in 0.17.0 (#1112)
1 parent 3458551 commit 0f93772

File tree

3 files changed

+45
-73
lines changed

3 files changed

+45
-73
lines changed

pkg/convert/jfr/parser.go

Lines changed: 36 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
1515
)
1616

17-
func ParseJFR(ctx context.Context, r io.Reader, s storage.Putter, pi *storage.PutInput) (err error) {
18-
chunks, err := parser.Parse(r)
17+
func ParseJFR(ctx context.Context, s storage.Putter, body io.Reader, pi *storage.PutInput) (err error) {
18+
chunks, err := parser.Parse(body)
1919
if err != nil {
2020
return fmt.Errorf("unable to parse JFR format: %w", err)
2121
}
@@ -24,11 +24,10 @@ func ParseJFR(ctx context.Context, r io.Reader, s storage.Putter, pi *storage.Pu
2424
err = multierror.Append(err, pErr)
2525
}
2626
}
27-
pi.Val = nil
2827
return err
2928
}
3029

31-
func parse(ctx context.Context, c parser.Chunk, s storage.Putter, pi *storage.PutInput) (err error) {
30+
func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *storage.PutInput) (err error) {
3231
var event, alloc, lock string
3332
cpu := tree.New()
3433
wall := tree.New()
@@ -84,83 +83,50 @@ func parse(ctx context.Context, c parser.Chunk, s storage.Putter, pi *storage.Pu
8483
}
8584
}
8685
}
87-
labels := pi.Key.Labels()
88-
prefix := labels["__name__"]
86+
87+
labelsOriginal := piOriginal.Key.Labels()
88+
prefix := labelsOriginal["__name__"]
89+
90+
cb := func(n string, t *tree.Tree, u metadata.Units) {
91+
labels := map[string]string{}
92+
for k, v := range labelsOriginal {
93+
labels[k] = v
94+
}
95+
labels["__name__"] = prefix + "." + n
96+
pi := &storage.PutInput{
97+
StartTime: piOriginal.StartTime,
98+
EndTime: piOriginal.EndTime,
99+
Key: segment.NewKey(labels),
100+
Val: t,
101+
SpyName: piOriginal.SpyName,
102+
SampleRate: piOriginal.SampleRate,
103+
Units: u,
104+
AggregationType: metadata.SumAggregationType,
105+
}
106+
if putErr := s.Put(ctx, pi); putErr != nil {
107+
err = multierror.Append(err, putErr)
108+
}
109+
}
110+
89111
if event == "cpu" || event == "itimer" || event == "wall" {
90112
profile := event
91113
if event == "wall" {
92114
profile = "cpu"
93115
}
94-
labels["__name__"] = prefix + "." + profile
95-
pi.Key = segment.NewKey(labels)
96-
pi.Val = cpu
97-
pi.Units = metadata.SamplesUnits
98-
pi.AggregationType = metadata.SumAggregationType
99-
if putErr := s.Put(ctx, pi); putErr != nil {
100-
err = multierror.Append(err, putErr)
101-
}
116+
cb(profile, cpu, metadata.SamplesUnits)
102117
}
103118
if event == "wall" {
104-
labels["__name__"] = prefix + "." + event
105-
pi.Key = segment.NewKey(labels)
106-
pi.Val = wall
107-
pi.Units = metadata.SamplesUnits
108-
pi.AggregationType = metadata.SumAggregationType
109-
if putErr := s.Put(ctx, pi); putErr != nil {
110-
err = multierror.Append(err, putErr)
111-
}
119+
cb(event, wall, metadata.SamplesUnits)
112120
}
113121
if alloc != "" {
114-
labels["__name__"] = prefix + ".alloc_in_new_tlab_objects"
115-
pi.Key = segment.NewKey(labels)
116-
pi.Val = inTLABObjects
117-
pi.Units = metadata.ObjectsUnits
118-
pi.AggregationType = metadata.SumAggregationType
119-
if putErr := s.Put(ctx, pi); putErr != nil {
120-
err = multierror.Append(err, putErr)
121-
}
122-
labels["__name__"] = prefix + ".alloc_in_new_tlab_bytes"
123-
pi.Key = segment.NewKey(labels)
124-
pi.Val = inTLABBytes
125-
pi.Units = metadata.BytesUnits
126-
pi.AggregationType = metadata.SumAggregationType
127-
if putErr := s.Put(ctx, pi); putErr != nil {
128-
err = multierror.Append(err, putErr)
129-
}
130-
labels["__name__"] = prefix + ".alloc_outside_tlab_objects"
131-
pi.Key = segment.NewKey(labels)
132-
pi.Val = outTLABObjects
133-
pi.Units = metadata.ObjectsUnits
134-
pi.AggregationType = metadata.SumAggregationType
135-
if putErr := s.Put(ctx, pi); putErr != nil {
136-
err = multierror.Append(err, putErr)
137-
}
138-
labels["__name__"] = prefix + ".alloc_outside_tlab_bytes"
139-
pi.Key = segment.NewKey(labels)
140-
pi.Val = outTLABBytes
141-
pi.Units = metadata.BytesUnits
142-
pi.AggregationType = metadata.SumAggregationType
143-
if putErr := s.Put(ctx, pi); putErr != nil {
144-
err = multierror.Append(err, putErr)
145-
}
122+
cb("alloc_in_new_tlab_objects", inTLABObjects, metadata.ObjectsUnits)
123+
cb("alloc_in_new_tlab_bytes", inTLABBytes, metadata.BytesUnits)
124+
cb("alloc_outside_tlab_objects", outTLABObjects, metadata.ObjectsUnits)
125+
cb("alloc_outside_tlab_bytes", outTLABBytes, metadata.BytesUnits)
146126
}
147127
if lock != "" {
148-
labels["__name__"] = prefix + ".lock_count"
149-
pi.Key = segment.NewKey(labels)
150-
pi.Val = lockSamples
151-
pi.Units = metadata.LockSamplesUnits
152-
pi.AggregationType = metadata.SumAggregationType
153-
if putErr := s.Put(ctx, pi); putErr != nil {
154-
err = multierror.Append(err, putErr)
155-
}
156-
labels["__name__"] = prefix + ".lock_duration"
157-
pi.Key = segment.NewKey(labels)
158-
pi.Val = lockDuration
159-
pi.Units = metadata.LockNanosecondsUnits
160-
pi.AggregationType = metadata.SumAggregationType
161-
if putErr := s.Put(ctx, pi); putErr != nil {
162-
err = multierror.Append(err, putErr)
163-
}
128+
cb("lock_count", lockSamples, metadata.LockSamplesUnits)
129+
cb("lock_duration", lockDuration, metadata.LockNanosecondsUnits)
164130
}
165131
return err
166132
}

pkg/parser/parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (p *Parser) Put(ctx context.Context, in *PutInput) error {
9393
err = convert.ParseIndividualLines(in.Body, cb)
9494
// with some formats we write directly to storage, hence the early return
9595
case in.Format == "jfr":
96-
return jfr.ParseJFR(ctx, in.Body, p.putter, pi)
96+
return jfr.ParseJFR(ctx, p.putter, in.Body, pi)
9797
case in.Format == "pprof":
9898
return writePprofFromBody(ctx, p.putter, in)
9999
case strings.Contains(in.ContentType, "multipart/form-data"):

pkg/server/ingest_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,18 @@ var _ = Describe("server", func() {
9191
done := make(chan interface{})
9292
go func() {
9393
defer GinkgoRecover()
94-
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
94+
95+
reg := prometheus.NewRegistry()
96+
97+
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), reg, new(health.Controller))
98+
queue := storage.NewIngestionQueue(logrus.StandardLogger(), s, reg, 4, 4)
99+
95100
Expect(err).ToNot(HaveOccurred())
96101
e, _ := exporter.NewExporter(nil, nil)
97102
c, _ := New(Config{
98103
Configuration: &(*cfg).Server,
99104
Storage: s,
100-
Putter: s,
105+
Putter: queue,
101106
MetricsExporter: e,
102107
Logger: logrus.New(),
103108
MetricsRegisterer: prometheus.NewRegistry(),
@@ -145,6 +150,7 @@ var _ = Describe("server", func() {
145150
expectedKey = name
146151
}
147152
sk, _ := segment.ParseKey(expectedKey)
153+
time.Sleep(10 * time.Millisecond)
148154
time.Sleep(sleepDur)
149155
gOut, err := s.Get(context.TODO(), &storage.GetInput{
150156
StartTime: st,

0 commit comments

Comments
 (0)