Skip to content

Commit 5668e31

Browse files
authored
atproto/lexicon: package for working with lexicon schemas, and runtime data validation (#420)
This is currently a branch on top of #407 - [x] parse lexicon schema JSON - [x] load entire directories of schema JSON files from disk as a catalog - [x] check lexicon schema semantics (eg, can't have min greater than max) - [x] validate runtime data (`map[string]any`) against lexicons - [x] whole bunch of corner-case tests - [x] CLI tool for some live-network testing - [x] add support for `tid` and `record-key` lex formats (not in specs yet) - [x] configurable flexible to legacy blobs and lenient datetime parsing (?) - [x] comments and example code probably in a later iteration: - [ ] ensure empty body works (bluesky-social/atproto#2746) - [ ] validate rkey type against lexicon - [ ] CLI tool to validate prod firehose - [ ] CLI tool to validate CAR files - [x] clarify specs around unions: only `object` and `token` types? - [x] clarify specs around `unknown`: only `object` type? - [ ] validate other "primary" lexicon types: subscription, HTTP body, HTTP URL params, etc
2 parents 7e7ac23 + 2215e75 commit 5668e31

23 files changed

+2567
-3
lines changed

atproto/data/parse.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ func parseMap(obj map[string]any) (any, error) {
111111
return nil, fmt.Errorf("$type field must contain a non-empty string")
112112
}
113113
}
114+
// legacy blob type
115+
if len(obj) == 2 {
116+
if _, ok := obj["mimeType"]; ok {
117+
if _, ok := obj["cid"]; ok {
118+
b, err := parseLegacyBlob(obj)
119+
if err != nil {
120+
return nil, err
121+
}
122+
return *b, nil
123+
}
124+
}
125+
}
114126
out := make(map[string]any, len(obj))
115127
for k, val := range obj {
116128
if len(k) > MAX_OBJECT_KEY_LEN {
@@ -213,6 +225,30 @@ func parseBlob(obj map[string]any) (*Blob, error) {
213225
}, nil
214226
}
215227

228+
func parseLegacyBlob(obj map[string]any) (*Blob, error) {
229+
if len(obj) != 2 {
230+
return nil, fmt.Errorf("legacy blobs expected to have 2 fields")
231+
}
232+
var err error
233+
mimeType, ok := obj["mimeType"].(string)
234+
if !ok {
235+
return nil, fmt.Errorf("blob 'mimeType' missing or not a string")
236+
}
237+
cidStr, ok := obj["cid"]
238+
if !ok {
239+
return nil, fmt.Errorf("blob 'cid' missing")
240+
}
241+
c, err := cid.Parse(cidStr)
242+
if err != nil {
243+
return nil, fmt.Errorf("invalid CID: %w", err)
244+
}
245+
return &Blob{
246+
Size: -1,
247+
MimeType: mimeType,
248+
Ref: CIDLink(c),
249+
}, nil
250+
}
251+
216252
func parseObject(obj map[string]any) (map[string]any, error) {
217253
out, err := parseMap(obj)
218254
if err != nil {

atproto/identity/doc.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
/*
22
Package identity provides types and routines for resolving handles and DIDs from the network
33
4-
The two main abstractions are a Catalog interface for identity service implementations, and an Identity structure which represents core identity information relevant to atproto. The Catalog interface can be nested, somewhat like HTTP middleware, to provide caching, observability, or other bespoke needs in more complex systems.
5-
6-
Much of the implementation of this SDK is based on existing code in indigo:api/extra.go
4+
The two main abstractions are a Directory interface for identity service implementations, and an Identity struct which represents core identity information relevant to atproto. The Directory interface can be nested, somewhat like HTTP middleware, to provide caching, observability, or other bespoke needs in more complex systems.
75
*/
86
package identity

atproto/lexicon/catalog.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package lexicon
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"io/fs"
8+
"log/slog"
9+
"os"
10+
"path/filepath"
11+
"strings"
12+
)
13+
14+
// Interface type for a resolver or container of lexicon schemas, and methods for validating generic data against those schemas.
15+
type Catalog interface {
16+
// Looks up a schema refrence (NSID string with optional fragment) to a Schema object.
17+
Resolve(ref string) (*Schema, error)
18+
}
19+
20+
// Trivial in-memory Lexicon Catalog implementation.
21+
type BaseCatalog struct {
22+
schemas map[string]Schema
23+
}
24+
25+
// Creates a new empty BaseCatalog
26+
func NewBaseCatalog() BaseCatalog {
27+
return BaseCatalog{
28+
schemas: make(map[string]Schema),
29+
}
30+
}
31+
32+
func (c *BaseCatalog) Resolve(ref string) (*Schema, error) {
33+
if ref == "" {
34+
return nil, fmt.Errorf("tried to resolve empty string name")
35+
}
36+
// default to #main if name doesn't have a fragment
37+
if !strings.Contains(ref, "#") {
38+
ref = ref + "#main"
39+
}
40+
s, ok := c.schemas[ref]
41+
if !ok {
42+
return nil, fmt.Errorf("schema not found in catalog: %s", ref)
43+
}
44+
return &s, nil
45+
}
46+
47+
// Inserts a schema loaded from a JSON file in to the catalog.
48+
func (c *BaseCatalog) AddSchemaFile(sf SchemaFile) error {
49+
base := sf.ID
50+
for frag, def := range sf.Defs {
51+
if len(frag) == 0 || strings.Contains(frag, "#") || strings.Contains(frag, ".") {
52+
// TODO: more validation here?
53+
return fmt.Errorf("schema name invalid: %s", frag)
54+
}
55+
name := base + "#" + frag
56+
if _, ok := c.schemas[name]; ok {
57+
return fmt.Errorf("catalog already contained a schema with name: %s", name)
58+
}
59+
// "A file can have at most one definition with one of the "primary" types. Primary types should always have the name main. It is possible for main to describe a non-primary type."
60+
switch s := def.Inner.(type) {
61+
case SchemaRecord, SchemaQuery, SchemaProcedure, SchemaSubscription:
62+
if frag != "main" {
63+
return fmt.Errorf("record, query, procedure, and subscription types must be 'main', not: %s", frag)
64+
}
65+
case SchemaToken:
66+
// add fully-qualified name to token
67+
s.fullName = name
68+
def.Inner = s
69+
}
70+
def.SetBase(base)
71+
if err := def.CheckSchema(); err != nil {
72+
return err
73+
}
74+
s := Schema{
75+
ID: name,
76+
Revision: sf.Revision,
77+
Def: def.Inner,
78+
}
79+
c.schemas[name] = s
80+
}
81+
return nil
82+
}
83+
84+
// Recursively loads all '.json' files from a directory in to the catalog.
85+
func (c *BaseCatalog) LoadDirectory(dirPath string) error {
86+
return filepath.WalkDir(dirPath, func(p string, d fs.DirEntry, err error) error {
87+
if err != nil {
88+
return err
89+
}
90+
if d.IsDir() {
91+
return nil
92+
}
93+
if !strings.HasSuffix(p, ".json") {
94+
return nil
95+
}
96+
slog.Debug("loading Lexicon schema file", "path", p)
97+
f, err := os.Open(p)
98+
if err != nil {
99+
return err
100+
}
101+
defer func() { _ = f.Close() }()
102+
103+
b, err := io.ReadAll(f)
104+
if err != nil {
105+
return err
106+
}
107+
108+
var sf SchemaFile
109+
if err = json.Unmarshal(b, &sf); err != nil {
110+
return err
111+
}
112+
if err = c.AddSchemaFile(sf); err != nil {
113+
return err
114+
}
115+
return nil
116+
})
117+
}

atproto/lexicon/cmd/lextool/main.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"log/slog"
8+
"os"
9+
10+
"github.com/bluesky-social/indigo/atproto/lexicon"
11+
12+
"github.com/urfave/cli/v2"
13+
)
14+
15+
func main() {
16+
app := cli.App{
17+
Name: "lex-tool",
18+
Usage: "informal debugging CLI tool for atproto lexicons",
19+
}
20+
app.Commands = []*cli.Command{
21+
&cli.Command{
22+
Name: "parse-schema",
23+
Usage: "parse an individual lexicon schema file (JSON)",
24+
Action: runParseSchema,
25+
},
26+
&cli.Command{
27+
Name: "load-directory",
28+
Usage: "try recursively loading all the schemas from a directory",
29+
Action: runLoadDirectory,
30+
},
31+
&cli.Command{
32+
Name: "validate-record",
33+
Usage: "fetch from network, validate against catalog",
34+
Action: runValidateRecord,
35+
},
36+
&cli.Command{
37+
Name: "validate-firehose",
38+
Usage: "subscribe to a firehose, validate every known record against catalog",
39+
Action: runValidateFirehose,
40+
},
41+
}
42+
h := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})
43+
slog.SetDefault(slog.New(h))
44+
app.RunAndExitOnError()
45+
}
46+
47+
func runParseSchema(cctx *cli.Context) error {
48+
p := cctx.Args().First()
49+
if p == "" {
50+
return fmt.Errorf("need to provide path to a schema file as an argument")
51+
}
52+
53+
f, err := os.Open(p)
54+
if err != nil {
55+
return err
56+
}
57+
defer func() { _ = f.Close() }()
58+
59+
b, err := io.ReadAll(f)
60+
if err != nil {
61+
return err
62+
}
63+
64+
var sf lexicon.SchemaFile
65+
if err := json.Unmarshal(b, &sf); err != nil {
66+
return err
67+
}
68+
out, err := json.MarshalIndent(sf, "", " ")
69+
if err != nil {
70+
return err
71+
}
72+
fmt.Println(string(out))
73+
return nil
74+
}
75+
76+
func runLoadDirectory(cctx *cli.Context) error {
77+
p := cctx.Args().First()
78+
if p == "" {
79+
return fmt.Errorf("need to provide directory path as an argument")
80+
}
81+
82+
c := lexicon.NewBaseCatalog()
83+
err := c.LoadDirectory(p)
84+
if err != nil {
85+
return err
86+
}
87+
88+
fmt.Println("success!")
89+
return nil
90+
}

atproto/lexicon/cmd/lextool/net.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"log/slog"
8+
"net/http"
9+
10+
"github.com/bluesky-social/indigo/atproto/data"
11+
"github.com/bluesky-social/indigo/atproto/identity"
12+
"github.com/bluesky-social/indigo/atproto/lexicon"
13+
"github.com/bluesky-social/indigo/atproto/syntax"
14+
15+
"github.com/urfave/cli/v2"
16+
)
17+
18+
func runValidateRecord(cctx *cli.Context) error {
19+
ctx := context.Background()
20+
args := cctx.Args().Slice()
21+
if len(args) != 2 {
22+
return fmt.Errorf("expected two args (catalog path and AT-URI)")
23+
}
24+
p := args[0]
25+
if p == "" {
26+
return fmt.Errorf("need to provide directory path as an argument")
27+
}
28+
29+
cat := lexicon.NewBaseCatalog()
30+
err := cat.LoadDirectory(p)
31+
if err != nil {
32+
return err
33+
}
34+
35+
aturi, err := syntax.ParseATURI(args[1])
36+
if err != nil {
37+
return err
38+
}
39+
if aturi.RecordKey() == "" {
40+
return fmt.Errorf("need a full, not partial, AT-URI: %s", aturi)
41+
}
42+
dir := identity.DefaultDirectory()
43+
ident, err := dir.Lookup(ctx, aturi.Authority())
44+
if err != nil {
45+
return fmt.Errorf("resolving AT-URI authority: %v", err)
46+
}
47+
pdsURL := ident.PDSEndpoint()
48+
if pdsURL == "" {
49+
return fmt.Errorf("could not resolve PDS endpoint for AT-URI account: %s", ident.DID.String())
50+
}
51+
52+
slog.Info("fetching record", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String())
53+
url := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
54+
pdsURL, ident.DID, aturi.Collection(), aturi.RecordKey())
55+
resp, err := http.Get(url)
56+
if err != nil {
57+
return err
58+
}
59+
if resp.StatusCode != http.StatusOK {
60+
return fmt.Errorf("fetch failed")
61+
}
62+
respBytes, err := io.ReadAll(resp.Body)
63+
if err != nil {
64+
return err
65+
}
66+
67+
body, err := data.UnmarshalJSON(respBytes)
68+
record, ok := body["value"].(map[string]any)
69+
if !ok {
70+
return fmt.Errorf("fetched record was not an object")
71+
}
72+
73+
slog.Info("validating", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String())
74+
err = lexicon.ValidateRecord(&cat, record, aturi.Collection().String(), lexicon.LenientMode)
75+
if err != nil {
76+
return err
77+
}
78+
fmt.Println("success!")
79+
return nil
80+
}
81+
82+
func runValidateFirehose(cctx *cli.Context) error {
83+
p := cctx.Args().First()
84+
if p == "" {
85+
return fmt.Errorf("need to provide directory path as an argument")
86+
}
87+
88+
cat := lexicon.NewBaseCatalog()
89+
err := cat.LoadDirectory(p)
90+
if err != nil {
91+
return err
92+
}
93+
94+
return fmt.Errorf("UNIMPLEMENTED")
95+
}

atproto/lexicon/docs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/*
2+
Package atproto/lexicon provides generic Lexicon schema parsing and run-time validation.
3+
*/
4+
package lexicon

0 commit comments

Comments
 (0)