Skip to content

Commit f4f5552

Browse files
committed
rethinkdb support for hard-deleting p2p messages
1 parent d8033a1 commit f4f5552

File tree

2 files changed

+98
-58
lines changed

2 files changed

+98
-58
lines changed

server/db/mongodb/adapter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2481,10 +2481,10 @@ func (a *adapter) MessageDeleteList(topic string, toDel *t.DelMessage) error {
24812481
"topic": topic,
24822482
// Skip already hard-deleted messages.
24832483
"delid": b.M{"$exists": false},
2484+
// Skip messages already soft-deleted for the current user
2485+
"deletedfor.user": b.M{"$ne": toDel.DeletedFor},
24842486
}
24852487

2486-
// Skip messages already soft-deleted for the current user
2487-
filter["deletedfor.user"] = b.M{"$ne": toDel.DeletedFor}
24882488
_, err = a.db.Collection("messages").UpdateMany(a.ctx, filter,
24892489
b.M{"$addToSet": b.M{
24902490
"deletedfor": &t.SoftDelete{
@@ -2493,7 +2493,7 @@ func (a *adapter) MessageDeleteList(topic string, toDel *t.DelMessage) error {
24932493
}}})
24942494
}
24952495

2496-
// Now make log entries. Needed for both hard- and soft-deleting.
2496+
// Make log entries. Needed for both hard- and soft-deleting.
24972497
_, err = a.db.Collection("dellog").InsertOne(a.ctx, toDel)
24982498
return err
24992499
}

server/db/rethinkdb/adapter.go

Lines changed: 95 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"errors"
1010
"hash/fnv"
11+
"sort"
1112
"strconv"
1213
"strings"
1314
"time"
@@ -2257,78 +2258,117 @@ func (a *adapter) messagesHardDelete(topic string) error {
22572258
return err
22582259
}
22592260

2261+
func rangeToQuery(delRanges []t.Range, topic string, query rdb.Term) rdb.Term {
2262+
if len(delRanges) > 1 || delRanges[0].Hi <= delRanges[0].Low {
2263+
var indexVals []any
2264+
for _, rng := range delRanges {
2265+
if rng.Hi == 0 {
2266+
indexVals = append(indexVals, []any{topic, rng.Low})
2267+
} else {
2268+
for i := rng.Low; i <= rng.Hi; i++ {
2269+
indexVals = append(indexVals, []any{topic, i})
2270+
}
2271+
}
2272+
}
2273+
query = query.GetAllByIndex("Topic_SeqId", indexVals...)
2274+
} else {
2275+
// Optimizing for a special case of single range low..hi
2276+
query = query.Between(
2277+
[]any{topic, delRanges[0].Low},
2278+
[]any{topic, delRanges[0].Hi},
2279+
rdb.BetweenOpts{Index: "Topic_SeqId", RightBound: "closed"})
2280+
}
2281+
return query
2282+
}
2283+
22602284
// MessageDeleteList deletes messages in the given topic with seqIds from the list.
22612285
func (a *adapter) MessageDeleteList(topic string, toDel *t.DelMessage) error {
2262-
var indexVals []any
22632286
var err error
22642287

22652288
if toDel == nil {
22662289
// Delete all messages.
2267-
err = a.messagesHardDelete(topic)
2268-
} else {
2269-
// Only some messages are being deleted
2290+
return a.messagesHardDelete(topic)
2291+
}
2292+
2293+
// Only some messages are being deleted
22702294

2271-
// Start with making a log entry
2272-
_, err = rdb.DB(a.dbName).Table("dellog").Insert(toDel).RunWrite(a.conn)
2295+
delRanges := toDel.SeqIdRanges
2296+
2297+
if toDel.DeletedFor == "" {
2298+
// Hard-deleting messages requires updates to the messages table.
2299+
query := rangeToQuery(delRanges, topic, rdb.DB(a.dbName).Table("messages"))
2300+
2301+
// Skip already hard-deleted messages.
2302+
query = query.Filter(rdb.Row.HasFields("DelId").Not())
2303+
2304+
// We are asked to delete messages no older than newerThan.
2305+
if newerThan := toDel.GetNewerThan(); newerThan != nil {
2306+
query = query.Filter(rdb.Row.Field("CreatedAt").Gt(newerThan))
2307+
}
2308+
2309+
query = query.Field("SeqId")
2310+
2311+
// Find the actual IDs still present in the database.
2312+
cursor, err := query.Run(a.conn)
22732313
if err != nil {
22742314
return err
22752315
}
2316+
defer cursor.Close()
22762317

2277-
query := rdb.DB(a.dbName).Table("messages")
2278-
if len(toDel.SeqIdRanges) > 1 || toDel.SeqIdRanges[0].Hi <= toDel.SeqIdRanges[0].Low {
2279-
for _, rng := range toDel.SeqIdRanges {
2280-
if rng.Hi == 0 {
2281-
indexVals = append(indexVals, []any{topic, rng.Low})
2282-
} else {
2283-
for i := rng.Low; i <= rng.Hi; i++ {
2284-
indexVals = append(indexVals, []any{topic, i})
2285-
}
2286-
}
2287-
}
2288-
query = query.GetAllByIndex("Topic_SeqId", indexVals...)
2289-
} else {
2290-
// Optimizing for a special case of single range low..hi
2291-
query = query.Between(
2292-
[]any{topic, toDel.SeqIdRanges[0].Low},
2293-
[]any{topic, toDel.SeqIdRanges[0].Hi},
2294-
rdb.BetweenOpts{Index: "Topic_SeqId", RightBound: "closed"})
2318+
var seqIDs []int
2319+
if err = cursor.All(&seqIDs); err != nil {
2320+
return err
22952321
}
2296-
// Skip already hard-deleted messages.
2297-
query = query.Filter(rdb.Row.HasFields("DelId").Not())
2298-
if toDel.DeletedFor == "" {
2299-
// First decrement use counter for attachments.
2300-
if err = a.decFileUseCounter(query); err == nil {
2301-
// Hard-delete individual messages. Message is not deleted but all fields with personal content
2302-
// are removed.
2303-
_, err = query.Replace(rdb.Row.Without("Head", "From", "Content", "Attachments").Merge(
2304-
map[string]any{
2305-
"DeletedAt": t.TimeNow(), "DelId": toDel.DelId})).
2306-
RunWrite(a.conn)
2307-
}
23082322

2309-
} else {
2310-
// Soft-deleting: adding DelId to DeletedFor
2311-
_, err = query.
2312-
// Skip messages already soft-deleted for the current user
2313-
Filter(func(row rdb.Term) any {
2314-
return rdb.Not(row.Field("DeletedFor").Default([]any{}).Contains(
2315-
func(df rdb.Term) any {
2316-
return df.Field("User").Eq(toDel.DeletedFor)
2317-
}))
2318-
}).Update(map[string]any{"DeletedFor": rdb.Row.Field("DeletedFor").
2319-
Default([]any{}).Append(
2320-
&t.SoftDelete{
2321-
User: toDel.DeletedFor,
2322-
DelId: toDel.DelId})}).RunWrite(a.conn)
2323-
}
2324-
2325-
// If operation has failed, remove dellog record.
2323+
if len(seqIDs) == 0 {
2324+
// Nothing to delete. No need to make a log entry. All done.
2325+
return nil
2326+
}
2327+
2328+
// Recalculate the actual ranges to delete.
2329+
sort.Ints(seqIDs)
2330+
delRanges = t.SliceToRanges(seqIDs)
2331+
2332+
// Compose a new query with the new ranges.
2333+
query = rangeToQuery(delRanges, topic, rdb.DB(a.dbName).Table("messages"))
2334+
2335+
// First decrement use counter for attachments.
2336+
if err = a.decFileUseCounter(query); err != nil {
2337+
return err
2338+
}
2339+
2340+
// Hard-delete individual messages. The messages are not deleted but all fields with personal content
2341+
// are removed.
2342+
if _, err = query.Replace(rdb.Row.Without("Head", "From", "Content", "Attachments").Merge(
2343+
map[string]any{
2344+
"DeletedAt": t.TimeNow(), "DelId": toDel.DelId})).
2345+
RunWrite(a.conn); err != nil {
2346+
return err
2347+
}
2348+
2349+
} else {
2350+
// Soft-deleting: adding DelId to DeletedFor
2351+
_, err = rdb.DB(a.dbName).Table("messages").
2352+
// Skip hard-deleted messages.
2353+
Filter(rdb.Row.HasFields("DelId").Not()).
2354+
// Skip messages already soft-deleted for the current user
2355+
Filter(func(row rdb.Term) any {
2356+
return rdb.Not(row.Field("DeletedFor").Default([]any{}).Contains(
2357+
func(df rdb.Term) any {
2358+
return df.Field("User").Eq(toDel.DeletedFor)
2359+
}))
2360+
}).Update(map[string]any{"DeletedFor": rdb.Row.Field("DeletedFor").
2361+
Default([]any{}).Append(
2362+
&t.SoftDelete{
2363+
User: toDel.DeletedFor,
2364+
DelId: toDel.DelId})}).RunWrite(a.conn)
23262365
if err != nil {
2327-
rdb.DB(a.dbName).Table("dellog").Get(toDel.Id).
2328-
Delete(rdb.DeleteOpts{Durability: "soft", ReturnChanges: false}).RunWrite(a.conn)
2366+
return err
23292367
}
23302368
}
23312369

2370+
// Make log entries. Needed for both hard- and soft-deleting.
2371+
_, err = rdb.DB(a.dbName).Table("dellog").Insert(toDel).RunWrite(a.conn)
23322372
return err
23332373
}
23342374

0 commit comments

Comments
 (0)