1
+ from dataclasses import dataclass , field
1
2
import json
2
3
import os
3
4
4
- from dataclasses import dataclass
5
+ from collections import defaultdict
6
+ from enum import Enum
5
7
from profiler .classes import (
6
8
TraceData ,
7
9
ControlRegUpdates ,
@@ -27,6 +29,141 @@ def setup_enable_to_tid(
27
29
)
28
30
29
31
32
+ class EventType (Enum ):
33
+ START = 1
34
+ END = 2
35
+
36
+
37
+ @dataclass
38
+ class CtrlGroupEvent :
39
+ uuid_name : str
40
+ sid_name : str
41
+ ctrl_group_name : str
42
+ timestamp : int
43
+ event_type : EventType
44
+
45
+ def __repr__ (self ):
46
+ event_type_str = (
47
+ "TrackEvent.TYPE_SLICE_BEGIN"
48
+ if self .event_type == EventType .START
49
+ else "TrackEvent.TYPE_SLICE_END"
50
+ )
51
+ return f"add_slice_event(builder, ts={ self .timestamp } , event_type={ event_type_str } , event_track_uuid={ self .uuid_name } , name={ self .ctrl_group_name } , SID={ self .sid_name } )"
52
+
53
+
54
+ @dataclass
55
+ class ProtoTimelineCell :
56
+ fully_qualified_cell_name : str
57
+ sid_name : str
58
+ uuid_names : set [str ] = field (default_factory = set )
59
+ events : list [CtrlGroupEvent ] = field (default_factory = list )
60
+
61
+ def __init__ (self , fully_qualified_cell_name : str ):
62
+ self .fully_qualified_cell_name = fully_qualified_cell_name
63
+ self .sid_name = self .fully_qualified_cell_name .replace ("." , "_" ).upper ()
64
+
65
+ def register_event (self , fully_qualified_ctrl_group : str , timestamp : int , event_type : EventType ):
66
+ uuid = fully_qualified_ctrl_group .replace ("." , "_" )
67
+ self .uuid_names .add (uuid )
68
+ new_event = CtrlGroupEvent (uuid , self .sid_name , fully_qualified_ctrl_group , timestamp , event_type )
69
+ self .events .append (new_event )
70
+
71
+ def out_str (self ):
72
+ s = ""
73
+ for uuid in self .uuid_names :
74
+ s += f"\t { uuid } = define_track(builder, { self .sid_name } )\n "
75
+ for event in self .events :
76
+ s += f"\t { event } \n "
77
+ return s
78
+
79
+
80
+ @dataclass
81
+ class ProtoTimeline :
82
+ # very silly way to do this, but will fix this later. Best practice may be to have a preexisting
83
+ # template file that we replace a small part of
84
+ before = """
85
+ #!/usr/bin/env python3
86
+ import uuid
87
+
88
+ from perfetto.trace_builder.proto_builder import TraceProtoBuilder
89
+ from perfetto.protos.perfetto.trace.perfetto_trace_pb2 import TrackEvent, TrackDescriptor, ProcessDescriptor, ThreadDescriptor
90
+
91
+ # Helper to define a new track with a unique UUID
92
+ def define_track(builder, group_name):
93
+ track_uuid = uuid.uuid4().int & ((1 << 63) - 1)
94
+ packet = builder.add_packet()
95
+ packet.track_descriptor.uuid = track_uuid
96
+ packet.track_descriptor.name = group_name
97
+ return track_uuid
98
+
99
+ # Helper to add a begin or end slice event to a specific track
100
+ def add_slice_event(builder, ts, event_type, event_track_uuid, name=None, SID=None):
101
+ packet = builder.add_packet()
102
+ packet.timestamp = ts
103
+ packet.track_event.type = event_type
104
+ packet.track_event.track_uuid = event_track_uuid
105
+ if name:
106
+ packet.track_event.name = name
107
+ packet.trusted_packet_sequence_id = SID
108
+
109
+ def populate_packets(builder: TraceProtoBuilder):
110
+ """
111
+
112
+ after = """
113
+ def main():
114
+ builder = TraceProtoBuilder()
115
+ populate_packets(builder)
116
+
117
+ output_filename = "my_custom_trace.pftrace"
118
+ with open(output_filename, 'wb') as f:
119
+ f.write(builder.serialize())
120
+
121
+ print(f"Trace written to {output_filename}")
122
+ print(f"Open with [https://ui.perfetto.dev](https://ui.perfetto.dev).")
123
+
124
+ if __name__ == "__main__":
125
+ main()
126
+
127
+ """
128
+
129
+ cell_infos : defaultdict [str , ProtoTimelineCell ] = field (default_factory = defaultdict )
130
+
131
+ def emit (self , out_file ):
132
+ with open (out_file , "w" ) as out :
133
+ out .write (self .before )
134
+ for cell in self .cell_infos :
135
+ out .write (self .cell_infos [cell ].out_str ())
136
+ out .write (self .after )
137
+
138
+
139
+
140
+ def compute_ctrl_timeline (tracedata : TraceData , cell_metadata : CellMetadata , out_dir : str ):
141
+ proto : ProtoTimeline = ProtoTimeline ()
142
+
143
+ currently_active_ctrl_groups : set [str ] = set ()
144
+
145
+ for i in tracedata .trace :
146
+ this_cycle_active_ctrl_groups : set [str ] = set ()
147
+ for stack in tracedata .trace [i ].stacks :
148
+ stack_acc = cell_metadata .main_shortname
149
+ for stack_elem in stack :
150
+ match stack_elem .element_type :
151
+ case StackElementType .CELL :
152
+ if not stack_elem .is_main :
153
+ stack_acc = f"{ stack_acc } .{ stack_elem .name } "
154
+ case StackElementType .CONTROL_GROUP :
155
+ this_cycle_active_ctrl_groups .add (f"{ stack_acc } .{ stack_elem .name } " )
156
+
157
+ for new_ctrl_group in this_cycle_active_ctrl_groups .difference (currently_active_ctrl_groups ):
158
+ proto .cell_infos [stack_acc ].register_event (new_ctrl_group , i , EventType .START )
159
+ for gone_ctrl_group in currently_active_ctrl_groups .difference (this_cycle_active_ctrl_groups ):
160
+ proto .cell_infos [stack_acc ].register_event (gone_ctrl_group , i , EventType .END )
161
+
162
+ currently_active_ctrl_groups = this_cycle_active_ctrl_groups
163
+
164
+ out_path = os .path .join (out_dir , "timeline-proto-gen.py" )
165
+ proto .emit (out_path )
166
+
30
167
class TimelineCell :
31
168
"""
32
169
Bookkeeping for forming cells and their groups
@@ -140,7 +277,6 @@ class ActiveEnable:
140
277
enable_name : str
141
278
cell_name : str # cell from which enable is active from
142
279
143
-
144
280
def compute_timeline (
145
281
tracedata : TraceData ,
146
282
cell_metadata : CellMetadata ,
0 commit comments