1
+ /*
2
+ * Copyright 2024 PixelsDB.
3
+ *
4
+ * This file is part of Pixels.
5
+ *
6
+ * Pixels is free software: you can redistribute it and/or modify
7
+ * it under the terms of the Affero GNU General Public License as
8
+ * published by the Free Software Foundation, either version 3 of
9
+ * the License, or (at your option) any later version.
10
+ *
11
+ * Pixels is distributed in the hope that it will be useful,
12
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
+ * Affero GNU General Public License for more details.
15
+ *
16
+ * You should have received a copy of the Affero GNU General Public
17
+ * License along with Pixels. If not, see
18
+ * <https://www.gnu.org/licenses/>.
19
+ */
20
+
21
+ #include " visibility/Visibility.h"
22
+ #include < cstring>
23
+ #include < stdexcept>
24
+
25
+ Visibility::Visibility () : allValue(0 ) {
26
+ this ->intendDeleteBitmap = new std::uint64_t [BITMAP_ARRAY_SIZE]();
27
+ this ->actualDeleteBitmap = new std::uint64_t [BITMAP_ARRAY_SIZE]();
28
+ }
29
+
30
+ Visibility::~Visibility () {
31
+ delete[] intendDeleteBitmap;
32
+ delete[] actualDeleteBitmap;
33
+ }
34
+
35
+ void Visibility::getVisibilityBitmap (std::uint64_t epochTs, std::uint64_t *visibilityBitmap) {
36
+ std::lock_guard<std::mutex> lock (mutex_);
37
+
38
+ std::memset (visibilityBitmap, 0 , sizeof (std::uint64_t ) * BITMAP_ARRAY_SIZE);
39
+
40
+ int epochIndex = findEpoch (epochTs);
41
+ std::size_t patchStart = epochArr[epochIndex].patchStartIndex ;
42
+ if (patchStart + CHECKPOINT_SIZE > patchArr.size ()) {
43
+ throw std::logic_error (" Invalid state: patchStartIndex out of bound" );
44
+ }
45
+ std::memcpy (visibilityBitmap, &patchArr[patchStart], CHECKPOINT_SIZE);
46
+
47
+ std::size_t patchPos = patchStart + 32 ;
48
+ std::size_t patchEnd = epochArr[epochIndex].patchEndIndex ;
49
+ if (patchPos <= patchEnd && patchEnd <= patchArr.size ()) {
50
+ for (; patchPos < patchEnd; ++patchPos) {
51
+ SET_BITMAP_BIT (visibilityBitmap, patchArr[patchPos]);
52
+ }
53
+ }
54
+
55
+ }
56
+
57
+ void Visibility::deleteRecord (int rowId, std::uint64_t epochTs) {
58
+ std::lock_guard<std::mutex> lock (mutex_);
59
+ rowId %= RECORDS_PER_VISIBILITY;
60
+ if (GET_BITMAP_BIT (intendDeleteBitmap, rowId) == 0 &&
61
+ GET_BITMAP_BIT (actualDeleteBitmap, rowId) == 0 ) {
62
+ SET_BITMAP_BIT (intendDeleteBitmap, rowId);
63
+ int epochIndex = findEpoch (epochTs);
64
+ patchArr.push_back (static_cast <std::uint8_t >(rowId));
65
+ epochArr[epochIndex].patchEndIndex += 1 ;
66
+ } else {
67
+ throw std::logic_error (" Invalid state: impossible state" );
68
+ }
69
+
70
+ for (int i = 0 ; i < BITMAP_ARRAY_SIZE; ++i) {
71
+ if (actualDeleteBitmap[i] != UINT64_MAX) {
72
+ allValue = -1 ;
73
+ return ;
74
+ }
75
+ }
76
+ allValue = 1 ; // all records are deleted
77
+ }
78
+
79
+ void Visibility::createNewEpoch (std::uint64_t epochTs) {
80
+ std::lock_guard<std::mutex> lock (mutex_);
81
+
82
+ EpochEntry newEpoch{};
83
+ newEpoch.timestamp = epochTs;
84
+ newEpoch.patchStartIndex = patchArr.size ();
85
+
86
+ std::uint8_t checkpointBuf[CHECKPOINT_SIZE];
87
+ std::memcpy (checkpointBuf, intendDeleteBitmap, CHECKPOINT_SIZE);
88
+
89
+ for (int i = 0 ; i < CHECKPOINT_SIZE; ++i) {
90
+ patchArr.push_back (checkpointBuf[i]);
91
+ }
92
+
93
+ newEpoch.patchEndIndex = patchArr.size ();
94
+ epochArr.push_back (newEpoch);
95
+ }
96
+
97
+ std::size_t Visibility::findEpoch (std::uint64_t epochTs) const {
98
+ std::size_t l = 0 , r = epochArr.size () - 1 ;
99
+ while (l <= r) {
100
+ std::size_t mid = l + (r - l) / 2 ;
101
+ if (epochArr[mid].timestamp <= epochTs) {
102
+ l = mid + 1 ;
103
+ } else {
104
+ r = mid - 1 ;
105
+ }
106
+ }
107
+ if (r == -1 ) {
108
+ throw std::logic_error (" Invalid state: epoch not found" );
109
+ }
110
+ return r;
111
+ }
112
+
113
+ void Visibility::cleanEpochArrAndPatchArr (std::uint64_t cleanUpToEpochTs) {
114
+ std::lock_guard<std::mutex> lock (mutex_);
115
+ std::size_t removeCount = 0 ;
116
+ while (removeCount < epochArr.size () &&
117
+ epochArr[removeCount].timestamp <= cleanUpToEpochTs) {
118
+ removeCount++;
119
+ }
120
+
121
+ if (removeCount == 0 ) {
122
+ return ;
123
+ } else if (removeCount >= epochArr.size ()) {
124
+ throw std::logic_error (" Invalid state: cannot remove all epochs" );
125
+ } else {
126
+ std::size_t freePatchUpTo = epochArr[removeCount].patchStartIndex ;
127
+ patchArr.erase (patchArr.begin (), patchArr.begin () + freePatchUpTo);
128
+ for (auto &epoch : epochArr) {
129
+ epoch.patchStartIndex -= freePatchUpTo;
130
+ epoch.patchEndIndex -= freePatchUpTo;
131
+ }
132
+ epochArr.erase (epochArr.begin (), epochArr.begin () + removeCount);
133
+ }
134
+ }
0 commit comments