@@ -22,24 +22,26 @@ package main
22
22
23
23
import (
24
24
"encoding/base64"
25
- "errors"
26
25
"fmt"
27
- "io"
28
26
"log"
29
27
"os"
30
- "strings "
28
+ "sort "
31
29
"time"
32
30
33
31
"github.com/pborman/getopt"
34
32
"go.uber.org/zap"
35
33
36
- "github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
34
+ "github.com/m3db/m3/src/x/ident"
35
+ xtime "github.com/m3db/m3/src/x/time"
37
36
)
38
37
39
38
func main () {
40
39
var (
41
- path = getopt .StringLong ("path" , 'p' , "" , "file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]" )
42
- idFilter = getopt .StringLong ("id-filter" , 'f' , "" , "ID Contains Filter (optional)" )
40
+ path = getopt .StringLong ("path" , 'p' , "" , "file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]" )
41
+ idFilter = getopt .StringLong ("id-filter" , 'f' , "" , "ID Contains Filter (optional)" )
42
+ idSizeFilter = getopt .IntLong ("id-size-filter" , 's' , 0 , "ID Size (bytes) Filter (optional)" )
43
+ mode = getopt .StringLong ("mode" , 'm' , "" , "Action [print,summary]. Defaults to 'print'" )
44
+ top = getopt .IntLong ("top" , 't' , 0 , "Print out only top N IDs" )
43
45
)
44
46
getopt .Parse ()
45
47
@@ -54,34 +56,40 @@ func main() {
54
56
os .Exit (1 )
55
57
}
56
58
57
- opts := commitlog .NewReaderOptions (commitlog .NewOptions (), false )
58
- reader := commitlog .NewReader (opts )
59
-
60
- _ , err = reader .Open (* path )
59
+ reader , err := newFilteringReader (* path , idFilter , idSizeFilter )
61
60
if err != nil {
62
61
logger .Fatalf ("unable to open reader: %v" , err )
63
62
}
63
+ defer reader .Close ()
64
+
65
+ switch * mode {
66
+ case "summary" :
67
+ err = printSummary (reader , top )
68
+ default :
69
+ err = printMetrics (reader )
70
+ }
71
+ if err != nil {
72
+ logger .Fatalf ("error while reading commitlog: %v" , err )
73
+ }
74
+ }
64
75
76
+ func printMetrics (reader * filteringReader ) error {
65
77
var (
66
78
entryCount uint32
67
79
annotationSizeTotal uint64
68
80
start = time .Now ()
69
81
)
70
82
71
83
for {
72
- entry , err := reader .Read ()
73
- if errors .Is (err , io .EOF ) {
74
- break
75
- }
84
+ entry , found , err := reader .Read ()
76
85
if err != nil {
77
- logger . Fatalf ( "err reading commitlog: %v" , err )
86
+ return err
78
87
}
79
-
80
- series := entry .Series
81
- if * idFilter != "" && ! strings .Contains (series .ID .String (), * idFilter ) {
82
- continue
88
+ if ! found {
89
+ break
83
90
}
84
91
92
+ series := entry .Series
85
93
fmt .Printf ("{id: %s, dp: %+v, ns: %s, shard: %d" , // nolint: forbidigo
86
94
series .ID , entry .Datapoint , entry .Series .Namespace , entry .Series .Shard )
87
95
if len (entry .Annotation ) > 0 {
@@ -96,11 +104,91 @@ func main() {
96
104
97
105
runTime := time .Since (start )
98
106
99
- if err := reader .Close (); err != nil {
100
- log .Fatalf ("unable to close reader: %v" , err )
101
- }
102
-
103
107
fmt .Printf ("\n Running time: %s\n " , runTime ) // nolint: forbidigo
104
108
fmt .Printf ("%d entries read\n " , entryCount ) // nolint: forbidigo
105
109
fmt .Printf ("Total annotation size: %d bytes\n " , annotationSizeTotal ) // nolint: forbidigo
110
+ return nil
106
111
}
112
+
113
+ func printSummary (reader * filteringReader , top * int ) error {
114
+ var (
115
+ entryCount uint32
116
+ start = time .Now ()
117
+ datapointCount = map [ident.ID ]uint32 {}
118
+ totalIDSize uint64
119
+ earliestDatapoint xtime.UnixNano
120
+ oldestDatapoint xtime.UnixNano
121
+ )
122
+
123
+ for {
124
+ entry , found , err := reader .Read ()
125
+ if err != nil {
126
+ return err
127
+ }
128
+ if ! found {
129
+ break
130
+ }
131
+ dp := entry .Datapoint
132
+
133
+ if earliestDatapoint == 0 || earliestDatapoint > dp .TimestampNanos {
134
+ earliestDatapoint = dp .TimestampNanos
135
+ }
136
+ if oldestDatapoint == 0 || oldestDatapoint < dp .TimestampNanos {
137
+ oldestDatapoint = dp .TimestampNanos
138
+ }
139
+
140
+ datapointCount [entry .Series .ID ]++
141
+
142
+ entryCount ++
143
+ }
144
+
145
+ runTime := time .Since (start )
146
+
147
+ fmt .Printf ("\n Running time: %s\n " , runTime ) // nolint: forbidigo
148
+ fmt .Printf ("%d entries read\n " , entryCount ) // nolint: forbidigo
149
+ fmt .Printf ("time range [%s:%s]\n " , earliestDatapoint .String (), oldestDatapoint .String ()) // nolint: forbidigo
150
+
151
+ datapointCountArr := idPairs {}
152
+ sizeArr := idPairs {}
153
+ for ID , count := range datapointCount {
154
+ IDSize := len (ID .Bytes ())
155
+ totalIDSize += uint64 (IDSize )
156
+ datapointCountArr = append (datapointCountArr , idPair {ID : ID , Value : count })
157
+ sizeArr = append (sizeArr , idPair {ID : ID , Value : uint32 (IDSize )})
158
+ }
159
+
160
+ sort .Sort (sort .Reverse (datapointCountArr ))
161
+ sort .Sort (sort .Reverse (sizeArr ))
162
+
163
+ fmt .Printf ("total ID size: %d bytes\n " , totalIDSize ) // nolint: forbidigo
164
+ fmt .Printf ("total distinct number of IDs %d \n " , len (datapointCount )) // nolint: forbidigo
165
+
166
+ limit := len (datapointCountArr )
167
+ if * top > 0 && * top < limit {
168
+ limit = * top
169
+ }
170
+ fmt .Printf ("ID datapoint counts: \n " ) // nolint: forbidigo
171
+ for i := 0 ; i < limit ; i ++ {
172
+ pair := datapointCountArr [i ]
173
+ fmt .Printf ("%-10d %s\n " , pair .Value , pair .ID .String ()) // nolint: forbidigo
174
+ }
175
+
176
+ fmt .Printf ("ID sizes(bytes): \n " ) // nolint: forbidigo
177
+ for i := 0 ; i < limit ; i ++ {
178
+ pair := sizeArr [i ]
179
+ fmt .Printf ("%-10d %s\n " , pair .Value , pair .ID .String ()) // nolint: forbidigo
180
+ }
181
+
182
+ return nil
183
+ }
184
+
185
+ type idPair struct {
186
+ ID ident.ID
187
+ Value uint32
188
+ }
189
+
190
+ type idPairs []idPair
191
+
192
+ func (p idPairs ) Len () int { return len (p ) }
193
+ func (p idPairs ) Swap (i , j int ) { p [i ], p [j ] = p [j ], p [i ] }
194
+ func (p idPairs ) Less (i , j int ) bool { return p [i ].Value < p [j ].Value }
0 commit comments