-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathspaces_table.go
169 lines (153 loc) · 4.48 KB
/
spaces_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package state
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/tidwall/gjson"
)
const (
RelationMSpaceParent = 1
RelationMSpaceChild = 2
)
type SpaceRelation struct {
Parent string `db:"parent"`
Child string `db:"child"`
Relation int `db:"relation"`
Ordering string `db:"ordering"`
IsSuggested bool `db:"suggested"`
}
func (sr *SpaceRelation) Key() string {
return fmt.Sprintf("%s-%s-%d", sr.Parent, sr.Child, sr.Relation)
}
// Returns a space relation from a compatible event, else nil.
func NewSpaceRelationFromEvent(ev Event) (sr *SpaceRelation, isDeleted bool) {
event := gjson.ParseBytes(ev.JSON)
if !event.Get("state_key").Exists() {
return nil, false
}
switch ev.Type {
case "m.space.child":
return &SpaceRelation{
Parent: ev.RoomID,
Child: ev.StateKey,
Relation: RelationMSpaceChild,
Ordering: event.Get("content.ordering").Str,
IsSuggested: event.Get("content.suggested").Bool(),
}, !event.Get("content.via").IsArray()
case "m.space.parent":
return &SpaceRelation{
Parent: ev.StateKey,
Child: ev.RoomID,
Relation: RelationMSpaceParent,
// parent events have a Canonical field but we don't care?
}, !event.Get("content.via").IsArray()
default:
return nil, false
}
}
// SpacesTable stores the space graph for all users.
type SpacesTable struct{}
func NewSpacesTable(db *sqlx.DB) *SpacesTable {
// make sure tables are made
db.MustExec(`
CREATE TABLE IF NOT EXISTS syncv3_spaces (
parent TEXT NOT NULL,
child TEXT NOT NULL,
relation SMALLINT NOT NULL,
suggested BOOL NOT NULL,
ordering TEXT NOT NULL, -- "" for unset
UNIQUE(parent, child, relation)
);
`)
return &SpacesTable{}
}
// Insert space relations by (parent, child, relation)
func (t *SpacesTable) BulkInsert(txn *sqlx.Tx, relations []SpaceRelation) error {
if len(relations) == 0 {
return nil
}
chunks := sqlutil.Chunkify(5, MaxPostgresParameters, SpaceRelationChunker(relations))
for _, chunk := range chunks {
_, err := txn.NamedExec(`
INSERT INTO syncv3_spaces (parent, child, relation, ordering, suggested)
VALUES (:parent, :child, :relation, :ordering, :suggested) ON CONFLICT (parent, child, relation) DO UPDATE SET ordering = EXCLUDED.ordering, suggested = EXCLUDED.suggested`, chunk)
if err != nil {
return err
}
}
return nil
}
// Delete space relations by (parent, child, relation)
func (t *SpacesTable) BulkDelete(txn *sqlx.Tx, relations []SpaceRelation) error {
if len(relations) == 0 {
return nil
}
for _, r := range relations {
_, err := txn.Exec(
`DELETE FROM syncv3_spaces WHERE parent=$1 AND child=$2 AND relation=$3`, r.Parent, r.Child, r.Relation,
)
if err != nil {
return err
}
}
return nil
}
// Select all children for these spaces
func (t *SpacesTable) SelectChildren(txn *sqlx.Tx, spaces []string) (map[string][]SpaceRelation, error) {
result := make(map[string][]SpaceRelation)
var data []SpaceRelation
err := txn.Select(&data, `SELECT parent, child, relation, ordering, suggested FROM syncv3_spaces WHERE parent = ANY($1)`, pq.StringArray(spaces))
if err != nil {
return nil, err
}
// bucket by space
for _, d := range data {
result[d.Parent] = append(result[d.Parent], d)
}
return result, nil
}
func (t *SpacesTable) HandleSpaceUpdates(txn *sqlx.Tx, events []Event) error {
// pull out relations, and bucket them so the last event wins to ensure we always use the latest
// values in case someone repeatedly adds/removes the same space
relations := make(map[string]struct {
relation *SpaceRelation
isDeleted bool
})
for _, ev := range events {
r, isDeleted := NewSpaceRelationFromEvent(ev)
if r != nil {
relations[r.Key()] = struct {
relation *SpaceRelation
isDeleted bool
}{
relation: r,
isDeleted: isDeleted,
}
}
}
// now bucket by add/remove for bulk operations
var added, removed []SpaceRelation
for _, r := range relations {
if r.isDeleted {
removed = append(removed, *r.relation)
} else {
added = append(added, *r.relation)
}
}
// update the database
if err := t.BulkInsert(txn, added); err != nil {
return fmt.Errorf("failed to BulkInsert: %s", err)
}
if err := t.BulkDelete(txn, removed); err != nil {
return fmt.Errorf("failed to BulkDelete: %s", err)
}
return nil
}
type SpaceRelationChunker []SpaceRelation
func (c SpaceRelationChunker) Len() int {
return len(c)
}
func (c SpaceRelationChunker) Subslice(i, j int) sqlutil.Chunker {
return c[i:j]
}