Skip to content

Commit 6e4e042

Browse files
author
Dave Flerlage
authored
Use list per transaction instead of one big list for persist_on_commit (#48)
1 parent d323b6b commit 6e4e042

File tree

5 files changed

+237
-69
lines changed

5 files changed

+237
-69
lines changed

temporal_sqlalchemy/bases.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import psycopg2.extras as psql_extras
1414

1515
from temporal_sqlalchemy import nine
16-
from temporal_sqlalchemy.metadata import get_session_metadata
16+
from temporal_sqlalchemy.metadata import STRICT_MODE_KEY
1717

1818
_ClockSet = collections.namedtuple('_ClockSet', ('effective', 'vclock'))
1919

@@ -96,7 +96,7 @@ def record_history(self,
9696
"""record all history for a given clocked object"""
9797
new_tick = self._get_new_tick(clocked)
9898

99-
is_strict_mode = get_session_metadata(session).get('strict_mode', False)
99+
is_strict_mode = session.info[STRICT_MODE_KEY]
100100
vclock_history = attributes.get_history(clocked, 'vclock')
101101
is_vclock_unchanged = vclock_history.unchanged and new_tick == vclock_history.unchanged[0]
102102

temporal_sqlalchemy/metadata.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,14 @@
11
import sqlalchemy.orm as orm
22

3-
TEMPORAL_METADATA_KEY = '__temporal'
4-
53
__all__ = [
6-
'get_session_metadata',
7-
'set_session_metadata',
4+
'STRICT_MODE_KEY',
5+
'CHANGESET_STACK_KEY',
6+
'IS_COMMITTING_KEY',
7+
'IS_VCLOCK_UNCHANGED_KEY',
88
]
99

1010

11-
def set_session_metadata(session: orm.Session, metadata: dict):
12-
if isinstance(session, orm.Session):
13-
session.info[TEMPORAL_METADATA_KEY] = metadata
14-
elif isinstance(session, orm.sessionmaker):
15-
session.configure(info={TEMPORAL_METADATA_KEY: metadata})
16-
else:
17-
raise ValueError('Invalid session')
18-
19-
20-
def get_session_metadata(session: orm.Session) -> dict:
21-
"""
22-
:return: metadata dictionary, or None if it was never installed
23-
"""
24-
if isinstance(session, orm.Session):
25-
return session.info.get(TEMPORAL_METADATA_KEY)
26-
elif isinstance(session, orm.sessionmaker):
27-
return session.kw.get('info', {}).get(TEMPORAL_METADATA_KEY)
28-
else:
29-
raise ValueError('Invalid session')
11+
STRICT_MODE_KEY = '__temporal_strict_mode'
12+
CHANGESET_STACK_KEY = '__temporal_changeset_stack'
13+
IS_COMMITTING_KEY = '__temporal_is_committing'
14+
IS_VCLOCK_UNCHANGED_KEY = '__temporal_is_vclock_unchanged'

temporal_sqlalchemy/session.py

Lines changed: 97 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
11
import datetime
22
import itertools
33
import typing
4+
import warnings
45

56
import sqlalchemy.event as event
67
import sqlalchemy.orm as orm
78

89
from temporal_sqlalchemy.bases import TemporalOption, Clocked
910
from temporal_sqlalchemy.metadata import (
10-
get_session_metadata,
11-
set_session_metadata
11+
STRICT_MODE_KEY,
12+
CHANGESET_STACK_KEY,
13+
IS_COMMITTING_KEY,
14+
IS_VCLOCK_UNCHANGED_KEY,
1215
)
1316

1417

15-
CHANGESET_KEY = 'changeset'
16-
IS_COMMITTING_KEY = 'is_committing'
17-
IS_VCLOCK_UNCHANGED = 'is_vclock_unchanged'
18+
def get_current_changeset(session):
19+
stack = session.info[CHANGESET_STACK_KEY]
20+
assert len(stack) > 0
1821

19-
20-
def _reset_flags(metadata):
21-
metadata[CHANGESET_KEY] = {}
22-
metadata[IS_COMMITTING_KEY] = False
23-
metadata[IS_VCLOCK_UNCHANGED] = True
22+
return stack[-1]
2423

2524

2625
def _temporal_models(session: orm.Session) -> typing.Iterable[Clocked]:
@@ -30,13 +29,17 @@ def _temporal_models(session: orm.Session) -> typing.Iterable[Clocked]:
3029

3130

3231
def _build_history(session, correlate_timestamp):
33-
metadata = get_session_metadata(session)
32+
# this shouldn't happen, but it might happen, log a warning and investigate
33+
if not session.info.get(CHANGESET_STACK_KEY):
34+
warnings.warn('changeset_stack is missing but we are in _build_history()')
35+
return
3436

35-
items = list(metadata[CHANGESET_KEY].items())
36-
metadata[CHANGESET_KEY].clear()
37+
changeset = get_current_changeset(session)
38+
items = list(changeset.items())
39+
changeset.clear()
3740

38-
is_strict_mode = metadata.get('strict_mode', False)
39-
is_vclock_unchanged = metadata.get(IS_VCLOCK_UNCHANGED, False)
41+
is_strict_mode = session.info[STRICT_MODE_KEY]
42+
is_vclock_unchanged = session.info[IS_VCLOCK_UNCHANGED_KEY]
4043
if items and is_strict_mode:
4144
assert not is_vclock_unchanged, \
4245
'commit() has triggered for a changed temporalized property without a clock tick'
@@ -49,45 +52,102 @@ def persist_history(session: orm.Session, flush_context, instances):
4952
if any(_temporal_models(session.deleted)):
5053
raise ValueError("Cannot delete temporal objects.")
5154

55+
# its possible the temporal session was initialized after the transaction has started
56+
_initialize_metadata(session)
57+
5258
correlate_timestamp = datetime.datetime.now(tz=datetime.timezone.utc)
5359
changed_rows = _temporal_models(itertools.chain(session.dirty, session.new))
5460

55-
metadata = get_session_metadata(session)
61+
changeset = get_current_changeset(session)
5662
for obj in changed_rows:
5763
if obj.temporal_options.allow_persist_on_commit:
5864
new_changes, is_vclock_unchanged = obj.temporal_options.get_history(obj)
5965

6066
if new_changes:
61-
if obj not in metadata[CHANGESET_KEY]:
62-
metadata[CHANGESET_KEY][obj] = {}
67+
if obj not in changeset:
68+
changeset[obj] = {}
6369

64-
old_changes = metadata[CHANGESET_KEY][obj]
70+
old_changes = changeset[obj]
6571
old_changes.update(new_changes)
6672

67-
metadata[IS_VCLOCK_UNCHANGED] = metadata[IS_VCLOCK_UNCHANGED] and is_vclock_unchanged
73+
session.info[IS_VCLOCK_UNCHANGED_KEY] = session.info[IS_VCLOCK_UNCHANGED_KEY] and is_vclock_unchanged
6874
else:
6975
obj.temporal_options.record_history(obj, session, correlate_timestamp)
7076

7177
# if this is the last flush, build all the history
72-
if metadata[IS_COMMITTING_KEY]:
78+
if session.info[IS_COMMITTING_KEY]:
7379
_build_history(session, correlate_timestamp)
7480

75-
_reset_flags(metadata)
81+
session.info[IS_COMMITTING_KEY] = False
7682

7783

7884
def enable_is_committing_flag(session):
79-
metadata = get_session_metadata(session)
80-
81-
metadata[IS_COMMITTING_KEY] = True
85+
"""before_commit happens before before_flush, and we need to make sure the history gets built
86+
during the final one of these two events, so we need to use the gross IS_COMMITTING_KEY flag to
87+
control this behavior"""
88+
session.info[IS_COMMITTING_KEY] = True
8289

90+
# if the session is clean, a final flush won't happen, so try to build the history now
8391
if session._is_clean():
8492
correlate_timestamp = datetime.datetime.now(tz=datetime.timezone.utc)
8593
_build_history(session, correlate_timestamp)
8694

8795
# building the history can cause the session to be dirtied, which will in turn call another
8896
# flush(), so we want to check this before reseting
97+
# if there are more changes, flush will build them itself
8998
if session._is_clean():
90-
_reset_flags(metadata)
99+
session.info[IS_COMMITTING_KEY] = False
100+
101+
102+
def _get_transaction_stack_depth(transaction):
103+
depth = 0
104+
105+
current = transaction
106+
while current:
107+
depth += 1
108+
current = transaction.parent
109+
110+
return depth
111+
112+
113+
def _initialize_metadata(session):
114+
if CHANGESET_STACK_KEY not in session.info:
115+
session.info[CHANGESET_STACK_KEY] = []
116+
117+
if IS_COMMITTING_KEY not in session.info:
118+
session.info[IS_COMMITTING_KEY] = False
119+
120+
if IS_VCLOCK_UNCHANGED_KEY not in session.info:
121+
session.info[IS_VCLOCK_UNCHANGED_KEY] = True
122+
123+
# sometimes temporalize a session after a transaction has already been open, so we need to
124+
# backfill any missing stack entries
125+
if len(session.info[CHANGESET_STACK_KEY]) == 0:
126+
depth = _get_transaction_stack_depth(session.transaction)
127+
for _ in range(depth):
128+
session.info[CHANGESET_STACK_KEY].append({})
129+
130+
131+
def start_transaction(session, transaction):
132+
_initialize_metadata(session)
133+
134+
session.info[CHANGESET_STACK_KEY].append({})
135+
136+
137+
def end_transaction(session, transaction):
138+
# there are some edge cases where no temporal changes actually happen, so don't bother
139+
if not session.info.get(CHANGESET_STACK_KEY):
140+
return
141+
142+
session.info[CHANGESET_STACK_KEY].pop()
143+
144+
# reset bookkeeping fields if we're ending a top most transaction
145+
if transaction.parent is None:
146+
session.info[IS_VCLOCK_UNCHANGED_KEY] = True
147+
session.info[IS_COMMITTING_KEY] = False
148+
149+
# there should be no more changeset stacks at this point, otherwise there is a mismatch
150+
assert len(session.info[CHANGESET_STACK_KEY]) == 0
91151

92152

93153
def temporal_session(session: typing.Union[orm.Session, orm.sessionmaker], strict_mode=False) -> orm.Session:
@@ -98,23 +158,26 @@ def temporal_session(session: typing.Union[orm.Session, orm.sessionmaker], stric
98158
:param strict_mode: if True, will raise exceptions when improper flush() calls are made (default is False)
99159
:return: temporalized SQLALchemy ORM session
100160
"""
101-
temporal_metadata = {
102-
'strict_mode': strict_mode,
103-
}
104-
_reset_flags(temporal_metadata)
105-
106161
# defer listening to the flush hook until after we update the metadata
107162
install_flush_hook = not is_temporal_session(session)
108163

109-
# update to the latest metadata
110-
set_session_metadata(session, temporal_metadata)
164+
if isinstance(session, orm.Session):
165+
session.info[STRICT_MODE_KEY] = strict_mode
166+
elif isinstance(session, orm.sessionmaker):
167+
session.configure(info={STRICT_MODE_KEY: strict_mode})
168+
else:
169+
raise ValueError('Invalid session')
111170

112171
if install_flush_hook:
113172
event.listen(session, 'before_flush', persist_history)
114173
event.listen(session, 'before_commit', enable_is_committing_flag)
115174

175+
# nested transaction handling
176+
event.listen(session, 'after_transaction_create', start_transaction)
177+
event.listen(session, 'after_transaction_end', end_transaction)
178+
116179
return session
117180

118181

119182
def is_temporal_session(session: orm.Session) -> bool:
120-
return isinstance(session, orm.Session) and get_session_metadata(session) is not None
183+
return isinstance(session, orm.Session) and session.info.get(STRICT_MODE_KEY) is not None

tests/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,10 @@ class PersistOnFlushTable(PersistenceStrategy):
275275
class PersistOnCommitTable(PersistenceStrategy):
276276
__tablename__ = 'persist_on_commit_table'
277277
__table_args__ = {'schema': SCHEMA}
278+
279+
280+
class NonTemporalTable(Base):
281+
__tablename__ = 'non_temporal_table'
282+
__table_args__ = {'schema': SCHEMA}
283+
284+
id = auto_uuid()

0 commit comments

Comments
 (0)