Skip to content

Commit c5575ff

Browse files
committed
refactor(envexec): support file stream in & out directly
1 parent 033790c commit c5575ff

File tree

5 files changed

+189
-87
lines changed

5 files changed

+189
-87
lines changed

cmd/go-judge/stream/file.go

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package stream
22

33
import (
44
"fmt"
5-
"io"
6-
"math"
75
"os"
86

97
"github.com/criyle/go-judge/envexec"
@@ -12,88 +10,75 @@ import (
1210
)
1311

1412
var (
15-
_ worker.CmdFile = &fileStreamIn{}
16-
_ worker.CmdFile = &fileStreamOut{}
17-
_ envexec.ReaderTTY = &fileStreamInReader{}
13+
_ worker.CmdFile = &fileStreamIn{}
14+
_ worker.CmdFile = &fileStreamOut{}
1815
)
1916

2017
type fileStreamIn struct {
18+
stream envexec.FileStreamIn
2119
index int
2220
fd int
23-
r io.ReadCloser
24-
w *io.PipeWriter
25-
tty *os.File
26-
done chan struct{}
2721
hasTTY bool
2822
}
2923

30-
type fileStreamInReader struct {
31-
*io.PipeReader
32-
fi *fileStreamIn
33-
}
34-
35-
func (f *fileStreamInReader) TTY(tty *os.File) {
36-
f.fi.tty = tty
37-
close(f.fi.done)
38-
}
39-
4024
func newFileStreamIn(index, fd int, hasTTY bool) *fileStreamIn {
41-
r, w := io.Pipe()
42-
fi := &fileStreamIn{index: index, fd: fd, w: w, done: make(chan struct{}), hasTTY: hasTTY}
43-
fi.r = &fileStreamInReader{r, fi}
44-
return fi
25+
return &fileStreamIn{
26+
stream: envexec.NewFileStreamIn(),
27+
index: index,
28+
fd: fd,
29+
hasTTY: hasTTY,
30+
}
4531
}
4632

4733
func (f *fileStreamIn) GetTTY() *os.File {
4834
if !f.hasTTY {
4935
return nil
5036
}
51-
<-f.done
52-
return f.tty
37+
return f.stream.WritePipe()
5338
}
5439

5540
func (f *fileStreamIn) Write(b []byte) (int, error) {
56-
return f.w.Write(b)
41+
return f.stream.WritePipe().Write(b)
5742
}
5843

5944
func (f *fileStreamIn) EnvFile(fs filestore.FileStore) (envexec.File, error) {
60-
return envexec.NewFileReader(f.r, true), nil
45+
return f.stream, nil
6146
}
6247

6348
func (f *fileStreamIn) String() string {
6449
return fmt.Sprintf("fileStreamIn:(index:%d,fd:%d)", f.index, f.fd)
6550
}
6651

6752
func (f *fileStreamIn) Close() error {
68-
f.r.Close()
69-
return f.w.Close()
53+
return f.stream.Close()
7054
}
7155

7256
type fileStreamOut struct {
73-
index int
74-
fd int
75-
r *io.PipeReader
76-
w *io.PipeWriter
57+
stream envexec.FileStreamOut
58+
index int
59+
fd int
7760
}
7861

7962
func newFileStreamOut(index, fd int) *fileStreamOut {
80-
r, w := io.Pipe()
81-
return &fileStreamOut{index: index, fd: fd, r: r, w: w}
63+
return &fileStreamOut{
64+
stream: envexec.NewFileStreamOut(),
65+
index: index,
66+
fd: fd,
67+
}
8268
}
8369

8470
func (f *fileStreamOut) Read(b []byte) (int, error) {
85-
return f.r.Read(b)
71+
return f.stream.ReadPipe().Read(b)
8672
}
8773

8874
func (f *fileStreamOut) EnvFile(fs filestore.FileStore) (envexec.File, error) {
89-
return envexec.NewFileWriter(f.w, envexec.Size(math.MaxInt32)), nil
75+
return f.stream, nil
9076
}
9177

9278
func (f *fileStreamOut) String() string {
9379
return fmt.Sprintf("fileStreamOut:(index:%d,fd:%d)", f.index, f.fd)
9480
}
9581

9682
func (f *fileStreamOut) Close() error {
97-
f.w.Close()
98-
return f.r.Close()
83+
return f.stream.Close()
9984
}

cmd/go-judge/stream/stream.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ func Start(baseCtx context.Context, s Stream, w worker.Worker, srcPrefix []strin
127127

128128
cancel()
129129
closeFunc()
130-
streamOut = nil
131130
wg.Wait()
132131
return err
133132
}

envexec/file.go

Lines changed: 127 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ import (
44
"fmt"
55
"io"
66
"os"
7+
"sync"
8+
)
9+
10+
var (
11+
_ File = &FileReader{}
12+
_ File = &fileStreamIn{}
13+
_ File = &fileStreamOut{}
14+
_ File = &FileInput{}
15+
_ File = &FileCollector{}
16+
_ File = &FileWriter{}
17+
_ File = &FileOpened{}
718
)
819

920
// File defines interface of envexec files
@@ -20,16 +31,98 @@ type FileReader struct {
2031

2132
func (*FileReader) isFile() {}
2233

23-
// NewFileReader creates File input which can be fully read before exec
24-
// or piped into exec
25-
func NewFileReader(r io.Reader, s bool) File {
26-
return &FileReader{Reader: r, Stream: s}
34+
// NewFileReader creates File input which can be fully read before exec.
35+
// If pipe is required, use the FileStream to get the write end of pipe instead
36+
func NewFileReader(r io.Reader) File {
37+
return &FileReader{Reader: r}
38+
}
39+
40+
// FileStreamIn represent a input streaming pipe and the streamer is able to write
41+
// to the write end of the pipe after pipe created. It is the callers
42+
// responsibility to close the WritePipe
43+
type FileStreamIn interface {
44+
File
45+
Done() <-chan struct{}
46+
WritePipe() *os.File
47+
Close() error
48+
}
49+
50+
type fileStreamIn struct {
51+
done chan struct{}
52+
w *sharedFile
53+
}
54+
55+
func NewFileStreamIn() FileStreamIn {
56+
return &fileStreamIn{
57+
done: make(chan struct{}),
58+
}
59+
}
60+
61+
func (*fileStreamIn) isFile() {}
62+
63+
func (f *fileStreamIn) start(w *sharedFile) {
64+
f.w = w
65+
close(f.done)
66+
}
67+
68+
func (f *fileStreamIn) Done() <-chan struct{} {
69+
return f.done
70+
}
71+
72+
func (f *fileStreamIn) WritePipe() *os.File {
73+
<-f.done
74+
return f.w.f
75+
}
76+
77+
func (f *fileStreamIn) Close() error {
78+
if f.w != nil {
79+
return f.w.Close()
80+
}
81+
return nil
82+
}
83+
84+
// FileStreamOut represent a out streaming pipe and the streamer is able to read
85+
// to the read end of the pipe after pipe created. It is the callers
86+
// responsibility to close the ReadPipe
87+
type FileStreamOut interface {
88+
File
89+
Done() <-chan struct{}
90+
ReadPipe() *os.File
91+
Close() error
2792
}
2893

29-
// ReaderTTY will be asserts when File Reader is provided and TTY is enabled
30-
// and then TTY will be called with pty file
31-
type ReaderTTY interface {
32-
TTY(*os.File)
94+
type fileStreamOut struct {
95+
done chan struct{}
96+
r *sharedFile
97+
}
98+
99+
func NewFileStreamOut() FileStreamOut {
100+
return &fileStreamOut{
101+
done: make(chan struct{}),
102+
}
103+
}
104+
105+
func (*fileStreamOut) isFile() {}
106+
107+
func (f *fileStreamOut) start(r *sharedFile) {
108+
f.r = r
109+
close(f.done)
110+
}
111+
112+
func (f *fileStreamOut) Done() <-chan struct{} {
113+
return f.done
114+
}
115+
116+
func (f *fileStreamOut) ReadPipe() *os.File {
117+
<-f.done
118+
return f.r.f
119+
}
120+
121+
func (f *fileStreamOut) Close() error {
122+
if f.r != nil {
123+
return f.r.Close()
124+
}
125+
return nil
33126
}
34127

35128
// FileInput represent file input which will be opened in read-only mode
@@ -104,3 +197,29 @@ func FileToReader(f File) (io.ReadCloser, error) {
104197
return nil, fmt.Errorf("file cannot open as reader: %T", f)
105198
}
106199
}
200+
201+
type sharedFile struct {
202+
mu sync.Mutex
203+
f *os.File
204+
count int
205+
}
206+
207+
func newShreadFile(f *os.File) *sharedFile {
208+
return &sharedFile{f: f, count: 0}
209+
}
210+
211+
func (s *sharedFile) Acquire() {
212+
s.mu.Lock()
213+
defer s.mu.Unlock()
214+
s.count++
215+
}
216+
217+
func (s *sharedFile) Close() error {
218+
s.mu.Lock()
219+
defer s.mu.Unlock()
220+
s.count--
221+
if s.count == 0 {
222+
return s.f.Close()
223+
}
224+
return nil
225+
}

0 commit comments

Comments
 (0)