|
6 | 6 | "compress/gzip" |
7 | 7 | "context" |
8 | 8 | "encoding/json" |
| 9 | + "errors" |
9 | 10 | "fmt" |
10 | 11 | "github.com/viant/afs" |
11 | 12 | "github.com/viant/datly/utils/types" |
@@ -42,56 +43,92 @@ func (l *LoadData) Exec(ctx context.Context, session handler.Session) (interface |
42 | 43 | if !ok || err != nil { |
43 | 44 | return nil, fmt.Errorf("invalid Loader URL: %w", err) |
44 | 45 | } |
| 46 | + |
45 | 47 | var URL string |
46 | | - switch URLValue.(type) { |
| 48 | + switch v := URLValue.(type) { |
47 | 49 | case string: |
48 | | - URL = URLValue.(string) |
| 50 | + URL = v |
49 | 51 | case *string: |
50 | | - URL = *URLValue.(*string) |
| 52 | + URL = *v |
51 | 53 | default: |
52 | | - return nil, fmt.Errorf("invalid Loader URL: expected %T, but had %T", URL, URLValue) |
| 54 | + return nil, fmt.Errorf("invalid Loader URL: expected %T, but had %T", "", URLValue) |
53 | 55 | } |
54 | 56 |
|
| 57 | + // Prefer .gz if the plain URL doesn't exist. |
55 | 58 | if ok, _ := l.fs.Exists(ctx, URL); !ok { |
56 | 59 | if ok, _ := l.fs.Exists(ctx, URL+".gz"); ok { |
57 | 60 | URL += ".gz" |
58 | 61 | } |
59 | 62 | } |
60 | 63 |
|
61 | | - isCompressed := strings.HasSuffix(URL, ".gz") |
| 64 | + // Download compressed or plain bytes (API returns []byte). |
62 | 65 | data, err := l.fs.DownloadWithURL(ctx, URL) |
63 | 66 | if err != nil { |
64 | 67 | return nil, fmt.Errorf("failed to load URL: %w", err) |
65 | 68 | } |
66 | | - if isCompressed { |
67 | | - reader, err := gzip.NewReader(bytes.NewReader(data)) |
| 69 | + |
| 70 | + // Build a streaming reader chain; avoid io.ReadAll on gzip. |
| 71 | + var r io.Reader = bytes.NewReader(data) |
| 72 | + if strings.HasSuffix(URL, ".gz") { |
| 73 | + gzr, err := gzip.NewReader(r) |
68 | 74 | if err != nil { |
69 | 75 | return nil, fmt.Errorf("failed to decompress URL: failed to create reader: %w (used URL: %s)", err, URL) |
70 | 76 | } |
71 | | - defer reader.Close() |
72 | | - if data, err = io.ReadAll(reader); err != nil { |
73 | | - return nil, fmt.Errorf("failed to decompress URL:%w (used URL: %s)", err, URL) |
74 | | - } |
| 77 | + defer gzr.Close() |
| 78 | + r = gzr |
75 | 79 | } |
| 80 | + |
| 81 | + br := bufio.NewReaderSize(r, 1<<20) // read-ahead; does NOT cap JSON size |
| 82 | + dec := json.NewDecoder(br) |
| 83 | + dec.UseNumber() |
| 84 | + |
| 85 | + // Output slice + appender (kept from your original design) |
76 | 86 | itemType := l.Options.OutputType.Elem() |
77 | 87 | xSlice := xunsafe.NewSlice(l.Options.OutputType) |
78 | | - scanner := bufio.NewScanner(bytes.NewReader(data)) |
79 | 88 | response := reflect.New(l.Options.OutputType).Interface() |
80 | 89 | appender := xSlice.Appender(xunsafe.AsPointer(response)) |
81 | | - scanner.Buffer(make([]byte, 1024*1024), 5*1024*1024) |
82 | | - for scanner.Scan() { |
83 | | - line := scanner.Bytes() |
84 | | - if len(line) == 0 { |
85 | | - continue |
| 90 | + |
| 91 | + // Reject top-level arrays to keep the code simple (no streaming array parsing). |
| 92 | + first, err := peekFirstNonSpace(br) |
| 93 | + if err != nil { |
| 94 | + if errors.Is(err, io.EOF) { |
| 95 | + return response, nil // empty file -> empty slice |
86 | 96 | } |
87 | | - item := types.NewValue(itemType) |
88 | | - err := json.Unmarshal(scanner.Bytes(), item) |
89 | | - if err != nil { |
90 | | - return nil, fmt.Errorf("invalid item: %w, %s", err, line) |
| 97 | + return nil, fmt.Errorf("read error: %w", err) |
| 98 | + } |
| 99 | + if first == '[' { |
| 100 | + return nil, fmt.Errorf("top-level JSON arrays are not supported; provide NDJSON (one object per line) or a single JSON object") |
| 101 | + } |
| 102 | + // Put the byte back so the decoder sees it. |
| 103 | + _ = br.UnreadByte() |
| 104 | + |
| 105 | + // Decode one value per call: supports single object or NDJSON. |
| 106 | + for { |
| 107 | + item := types.NewValue(itemType) // pointer to zero value of element type |
| 108 | + if err := dec.Decode(item); err != nil { |
| 109 | + if errors.Is(err, io.EOF) { |
| 110 | + break |
| 111 | + } |
| 112 | + return nil, fmt.Errorf("invalid item: %w", err) |
91 | 113 | } |
92 | 114 | appender.Append(item) |
93 | 115 | } |
94 | | - return response, scanner.Err() |
| 116 | + |
| 117 | + return response, nil |
| 118 | +} |
| 119 | + |
| 120 | +// Reads and returns the first non-space byte without consuming input for the decoder. |
| 121 | +func peekFirstNonSpace(br *bufio.Reader) (byte, error) { |
| 122 | + for { |
| 123 | + b, err := br.ReadByte() |
| 124 | + if err != nil { |
| 125 | + return 0, err |
| 126 | + } |
| 127 | + if b == ' ' || b == '\n' || b == '\r' || b == '\t' { |
| 128 | + continue |
| 129 | + } |
| 130 | + return b, nil |
| 131 | + } |
95 | 132 | } |
96 | 133 |
|
97 | 134 | func (*LoadDataProvider) New(ctx context.Context, opts ...handler.Option) (handler.Handler, error) { |
|
0 commit comments