Skip to content

Commit a4667a5

Browse files
Andreas MollAndreas Moll
authored andcommitted
Add start and end time to logs for workflows, dags and tasks
1 parent 6b51bf2 commit a4667a5

File tree

4 files changed

+70
-23
lines changed

4 files changed

+70
-23
lines changed

examples/timing.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ def random_sleep(data, store, signal, context):
1818

1919
# the callback function for the task that prints the run times
2020
def print_times(data, store, signal, context):
21-
dag_log = store.get(key='log.{}'.format(context.dag_name),
21+
dag_log = store.get(key='log.{}.tasks'.format(context.dag_name),
2222
section=DataStoreDocumentSection.Meta)
2323
for task, fields in dag_log.items():
24-
print(task, 'on', fields['worker'], 'took', fields['duration'], 'seconds')
24+
# The print task has not finished yet, so there is no duration available
25+
if task != context.task_name:
26+
print(task, 'on', fields['worker'], 'took', fields['duration'], 'seconds')
27+
else:
28+
print(task, 'on', fields['worker'], 'is still running')
2529

2630

2731
# create the main DAG

lightflow/models/datastore.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from bson.binary import Binary
44
from bson.objectid import ObjectId
55
import pickle
6+
from datetime import datetime
67
from gridfs import GridFS
78
from urllib.parse import quote_plus
89

@@ -358,7 +359,7 @@ def _encode_value(self, value):
358359
Returns:
359360
object: The encoded value ready to be stored in MongoDB.
360361
"""
361-
if isinstance(value, (int, float, str, bool)):
362+
if isinstance(value, (int, float, str, bool, datetime)):
362363
return value
363364
elif isinstance(value, list):
364365
return [self._encode_value(item) for item in value]
@@ -388,7 +389,7 @@ def _decode_value(self, value):
388389
Returns:
389390
object: The decoded value as a valid Python object.
390391
"""
391-
if isinstance(value, (int, float, str, bool)):
392+
if isinstance(value, (int, float, str, bool, datetime)):
392393
return value
393394
elif isinstance(value, list):
394395
return [self._decode_value(item) for item in value]

lightflow/queue/jobs.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import celery
2-
from datetime import datetime
2+
from datetime import datetime, timezone
33
from functools import partial
44

55
from lightflow.logger import get_logger
@@ -26,7 +26,7 @@ def execute_workflow(self, workflow, workflow_id=None):
2626
workflow_id (string): If a workflow ID is provided the workflow run will use
2727
this ID, if not a new ID will be auto generated.
2828
"""
29-
start_time = datetime.now()
29+
start_time = datetime.utcnow()
3030

3131
logger.info('Running workflow <{}>'.format(workflow.name))
3232
data_store = DataStore(**self.app.user_options['config'].data_store,
@@ -39,7 +39,7 @@ def execute_workflow(self, workflow, workflow_id=None):
3939
workflow_id = data_store.add(payload={
4040
'name': workflow.name,
4141
'queue': workflow.queue,
42-
'start_time': datetime.utcnow()
42+
'start_time': start_time
4343
})
4444
logger.info('Created workflow ID: {}'.format(workflow_id))
4545

@@ -48,7 +48,7 @@ def execute_workflow(self, workflow, workflow_id=None):
4848
job_type=JobType.Workflow,
4949
name=workflow.name,
5050
queue=workflow.queue,
51-
time=datetime.utcnow(),
51+
time=start_time,
5252
workflow_id=workflow_id,
5353
duration=None)
5454

@@ -62,7 +62,7 @@ def execute_workflow(self, workflow, workflow_id=None):
6262
'type': JobType.Workflow,
6363
'workflow_id': workflow_id,
6464
'queue': workflow.queue,
65-
'start_time': datetime.utcnow(),
65+
'start_time': start_time,
6666
'arguments': workflow.provided_arguments})
6767

6868
# run the DAGs in the workflow
@@ -71,6 +71,16 @@ def execute_workflow(self, workflow, workflow_id=None):
7171
signal_server=signal_server,
7272
workflow_id=workflow_id)
7373

74+
end_time = datetime.utcnow()
75+
duration = (end_time - start_time).total_seconds()
76+
77+
# update data store with provenance information
78+
store_doc = data_store.get(workflow_id)
79+
store_doc.set(key='end_time', value=end_time,
80+
section=DataStoreDocumentSection.Meta)
81+
store_doc.set(key='duration', value=duration,
82+
section=DataStoreDocumentSection.Meta)
83+
7484
# send custom celery event that the workflow has succeeded
7585
event_name = JobEventName.Succeeded if not workflow.is_stopped \
7686
else JobEventName.Aborted
@@ -79,9 +89,9 @@ def execute_workflow(self, workflow, workflow_id=None):
7989
job_type=JobType.Workflow,
8090
name=workflow.name,
8191
queue=workflow.queue,
82-
time=datetime.utcnow(),
92+
time=end_time,
8393
workflow_id=workflow_id,
84-
duration=(datetime.now() - start_time).total_seconds())
94+
duration=duration)
8595

8696
logger.info('Finished workflow <{}>'.format(workflow.name))
8797

@@ -101,15 +111,23 @@ def execute_dag(self, dag, workflow_id, data=None):
101111
the first tasks in the dag. This allows the transfer of
102112
data from dag to dag.
103113
"""
104-
start_time = datetime.now()
114+
start_time = datetime.utcnow()
105115
logger.info('Running DAG <{}>'.format(dag.name))
106116

117+
store_doc = DataStore(**self.app.user_options['config'].data_store,
118+
auto_connect=True).get(workflow_id)
119+
store_loc = 'log.{}'.format(dag.name)
120+
121+
# update data store with provenance information
122+
store_doc.set(key='{}.start_time'.format(store_loc), value=start_time,
123+
section=DataStoreDocumentSection.Meta)
124+
107125
# send custom celery event that the dag has been started
108126
self.send_event(JobEventName.Started,
109127
job_type=JobType.Dag,
110128
name=dag.name,
111129
queue=dag.queue,
112-
time=datetime.utcnow(),
130+
time=start_time,
113131
workflow_id=workflow_id,
114132
duration=None)
115133

@@ -128,15 +146,24 @@ def execute_dag(self, dag, workflow_id, data=None):
128146
signal=signal,
129147
data=data)
130148

149+
end_time = datetime.utcnow()
150+
duration = (end_time - start_time).total_seconds()
151+
152+
# update data store with provenance information
153+
store_doc.set(key='{}.end_time'.format(store_loc), value=end_time,
154+
section=DataStoreDocumentSection.Meta)
155+
store_doc.set(key='{}.duration'.format(store_loc), value=duration,
156+
section=DataStoreDocumentSection.Meta)
157+
131158
# send custom celery event that the dag has succeeded
132159
event_name = JobEventName.Succeeded if not signal.is_stopped else JobEventName.Aborted
133160
self.send_event(event_name,
134161
job_type=JobType.Dag,
135162
name=dag.name,
136163
queue=dag.queue,
137-
time=datetime.utcnow(),
164+
time=end_time,
138165
workflow_id=workflow_id,
139-
duration=(datetime.now() - start_time).total_seconds())
166+
duration=duration)
140167

141168
logger.info('Finished DAG <{}>'.format(dag.name))
142169

@@ -153,10 +180,11 @@ def execute_task(self, task, workflow_id, data=None):
153180
data (MultiTaskData): An optional MultiTaskData object that contains the data
154181
that has been passed down from upstream tasks.
155182
"""
156-
start_time = datetime.now()
183+
start_time = datetime.utcnow()
157184

158185
store_doc = DataStore(**self.app.user_options['config'].data_store,
159186
auto_connect=True).get(workflow_id)
187+
store_loc = 'log.{}.tasks.{}'.format(task.dag_name, task.name)
160188

161189
def handle_callback(message, event_type, exc=None):
162190
msg = '{}: {}'.format(message, str(exc)) if exc is not None else message
@@ -169,26 +197,40 @@ def handle_callback(message, event_type, exc=None):
169197
else:
170198
logger.info(msg)
171199

172-
# store a log into the persistent data store
200+
current_time = datetime.utcnow()
201+
202+
# store provenance information about a task
173203
if event_type != JobEventName.Started:
174-
duration = (datetime.now() - start_time).total_seconds()
204+
duration = (current_time - start_time).total_seconds()
205+
206+
store_doc.set(key='{}.end_time'.format(store_loc),
207+
value=current_time,
208+
section=DataStoreDocumentSection.Meta)
175209

176-
store_doc.set(key='log.{}.{}.duration'.format(task.dag_name, task.name),
210+
store_doc.set(key='{}.duration'.format(store_loc),
177211
value=duration,
178212
section=DataStoreDocumentSection.Meta)
213+
else:
214+
# store provenance information about a task
215+
store_doc.set(key='{}.start_time'.format(store_loc),
216+
value=start_time,
217+
section=DataStoreDocumentSection.Meta)
179218

180-
store_doc.set(key='log.{}.{}.worker'.format(task.dag_name, task.name),
219+
store_doc.set(key='{}.worker'.format(store_loc),
181220
value=self.request.hostname,
182221
section=DataStoreDocumentSection.Meta)
183-
else:
222+
223+
store_doc.set(key='{}.queue'.format(store_loc),
224+
value=task.queue,
225+
section=DataStoreDocumentSection.Meta)
184226
duration = None
185227

186228
# send custom celery event
187229
self.send_event(event_type,
188230
job_type=JobType.Task,
189231
name=task.name,
190232
queue=task.queue,
191-
time=datetime.utcnow(),
233+
time=current_time,
192234
workflow_id=workflow_id,
193235
duration=duration)
194236

lightflow/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.10.1'
1+
__version__ = '1.10.2'

0 commit comments

Comments
 (0)