Skip to content

Commit e42c845

Browse files
committed
优化并发读写相关代码
1 parent fe6e5ba commit e42c845

File tree

8 files changed

+137
-71
lines changed

8 files changed

+137
-71
lines changed

internal/caches/reader_file.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
type FileReader struct {
14-
fp *os.File
14+
fp *fsutils.File
1515

1616
openFile *OpenFile
1717
openFileCache *OpenFileCache
@@ -29,7 +29,7 @@ type FileReader struct {
2929
isClosed bool
3030
}
3131

32-
func NewFileReader(fp *os.File) *FileReader {
32+
func NewFileReader(fp *fsutils.File) *FileReader {
3333
return &FileReader{fp: fp}
3434
}
3535

@@ -175,9 +175,7 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
175175
var headerSize = this.headerSize
176176

177177
for {
178-
fsutils.ReaderLimiter.Ack()
179178
n, err := this.fp.Read(buf)
180-
fsutils.ReaderLimiter.Release()
181179
if n > 0 {
182180
if n < headerSize {
183181
goNext, e := callback(n)
@@ -239,9 +237,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
239237
}
240238

241239
for {
242-
fsutils.ReaderLimiter.Ack()
243240
n, err := this.fp.Read(buf)
244-
fsutils.ReaderLimiter.Release()
245241
if n > 0 {
246242
goNext, e := callback(n)
247243
if e != nil {
@@ -272,9 +268,7 @@ func (this *FileReader) Read(buf []byte) (n int, err error) {
272268
return
273269
}
274270

275-
fsutils.ReaderLimiter.Ack()
276271
n, err = this.fp.Read(buf)
277-
fsutils.ReaderLimiter.Release()
278272
if err != nil && err != io.EOF {
279273
_ = this.discard()
280274
}
@@ -306,18 +300,14 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
306300
isOk = true
307301
return ErrInvalidRange
308302
}
309-
fsutils.ReaderLimiter.Ack()
310303
_, err := this.fp.Seek(offset, io.SeekStart)
311-
fsutils.ReaderLimiter.Release()
312304
if err != nil {
313305
return err
314306
}
315307

316308
for {
317309
var n int
318-
fsutils.ReaderLimiter.Ack()
319310
n, err = this.fp.Read(buf)
320-
fsutils.ReaderLimiter.Release()
321311
if n > 0 {
322312
var n2 = int(end-offset) + 1
323313
if n2 <= n {
@@ -363,7 +353,7 @@ func (this *FileReader) ContainsRange(r rangeutils.Range) (r2 rangeutils.Range,
363353

364354
// FP 原始的文件句柄
365355
func (this *FileReader) FP() *os.File {
366-
return this.fp
356+
return this.fp.Raw()
367357
}
368358

369359
func (this *FileReader) Close() error {
@@ -378,18 +368,16 @@ func (this *FileReader) Close() error {
378368
} else {
379369
var cacheMeta = make([]byte, len(this.meta))
380370
copy(cacheMeta, this.meta)
381-
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize))
371+
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp.Raw(), cacheMeta, this.header, this.LastModified(), this.bodySize))
382372
}
383373
return nil
384374
}
385375

386376
return this.fp.Close()
387377
}
388378

389-
func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
390-
fsutils.ReaderLimiter.Ack()
379+
func (this *FileReader) readToBuff(fp *fsutils.File, buf []byte) (ok bool, err error) {
391380
n, err := fp.Read(buf)
392-
fsutils.ReaderLimiter.Release()
393381
if err != nil {
394382
return false, err
395383
}

internal/caches/reader_partial_file.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
99
"github.com/iwind/TeaGo/types"
1010
"io"
11-
"os"
1211
)
1312

1413
type PartialFileReader struct {
@@ -18,7 +17,7 @@ type PartialFileReader struct {
1817
rangePath string
1918
}
2019

21-
func NewPartialFileReader(fp *os.File) *PartialFileReader {
20+
func NewPartialFileReader(fp *fsutils.File) *PartialFileReader {
2221
return &PartialFileReader{
2322
FileReader: NewFileReader(fp),
2423
rangePath: PartialRangesFilePath(fp.Name()),

internal/caches/storage_file.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
439439

440440
var reader Reader
441441
if isPartial {
442-
var partialFileReader = NewPartialFileReader(fp)
442+
var partialFileReader = NewPartialFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
443443
partialFileReader.openFile = openFile
444444
partialFileReader.openFileCache = openFileCache
445445
reader = partialFileReader
446446
} else {
447-
var fileReader = NewFileReader(fp)
447+
var fileReader = NewFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
448448
fileReader.openFile = openFile
449449
fileReader.openFileCache = openFileCache
450450
reader = fileReader
@@ -593,7 +593,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
593593
if existsCacheItem {
594594
readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444)
595595
if err == nil {
596-
var partialReader = NewPartialFileReader(readerFp)
596+
var partialReader = NewPartialFileReader(fsutils.NewFile(readerFp, fsutils.FlagRead))
597597
err = partialReader.Init()
598598
_ = partialReader.Close()
599599
if err == nil && partialReader.bodyOffset > 0 {
@@ -629,7 +629,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
629629
return nil, ErrServerIsBusy
630630
}
631631
}
632-
writer, err := os.OpenFile(tmpPath, flags, 0666)
632+
fp, err := os.OpenFile(tmpPath, flags, 0666)
633633
if !isFlushing {
634634
fsutils.WriterLimiter.Release()
635635
}
@@ -639,14 +639,16 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
639639

640640
// open file again
641641
fsutils.WriterLimiter.Ack()
642-
writer, err = os.OpenFile(tmpPath, flags, 0666)
642+
fp, err = os.OpenFile(tmpPath, flags, 0666)
643643
fsutils.WriterLimiter.Release()
644644
}
645645
if err != nil {
646646
return nil, err
647647
}
648648
}
649649

650+
var writer = fsutils.NewFile(fp, fsutils.FlagWrite)
651+
650652
var removeOnFailure = true
651653
defer func() {
652654
if err != nil {
@@ -663,7 +665,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
663665
}()
664666

665667
// 尝试锁定,如果锁定失败,则直接返回
668+
fsutils.WriterLimiter.Ack()
666669
err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
670+
fsutils.WriterLimiter.Release()
667671
if err != nil {
668672
removeOnFailure = false
669673
return nil, fmt.Errorf("%w (003)", ErrFileIsWriting)
@@ -700,9 +704,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
700704
metaBodySize = bodySize
701705
}
702706

703-
fsutils.WriterLimiter.Ack()
704707
_, err = writer.Write(metaBytes)
705-
fsutils.WriterLimiter.Release()
706708
if err != nil {
707709
return nil, err
708710
}

internal/caches/writer_file.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@ import (
66
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
77
"github.com/iwind/TeaGo/types"
88
"io"
9-
"os"
109
"strings"
1110
"sync"
1211
)
1312

1413
type FileWriter struct {
1514
storage StorageInterface
16-
rawWriter *os.File
15+
rawWriter *fsutils.File
1716
key string
1817

1918
metaHeaderSize int
@@ -26,9 +25,11 @@ type FileWriter struct {
2625
maxSize int64
2726
endFunc func()
2827
once sync.Once
28+
29+
modifiedBytes int
2930
}
3031

31-
func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
32+
func NewFileWriter(storage StorageInterface, rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
3233
return &FileWriter{
3334
storage: storage,
3435
key: key,
@@ -43,9 +44,7 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp
4344

4445
// WriteHeader 写入数据
4546
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
46-
fsutils.WriterLimiter.Ack()
4747
n, err = this.rawWriter.Write(data)
48-
fsutils.WriterLimiter.Release()
4948
this.headerSize += int64(n)
5049
if err != nil {
5150
_ = this.Discard()
@@ -79,7 +78,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
7978
var l = len(data)
8079
if l > (2 << 20) {
8180
var offset = 0
82-
const bufferSize = 256 << 10
81+
const bufferSize = 64 << 10
8382
for {
8483
var end = offset + bufferSize
8584
if end > l {
@@ -145,24 +144,18 @@ func (this *FileWriter) Close() error {
145144

146145
err := this.WriteHeaderLength(types.Int(this.headerSize))
147146
if err != nil {
148-
fsutils.WriterLimiter.Ack()
149147
_ = this.rawWriter.Close()
150-
fsutils.WriterLimiter.Release()
151148
_ = fsutils.Remove(path)
152149
return err
153150
}
154151
err = this.WriteBodyLength(this.bodySize)
155152
if err != nil {
156-
fsutils.WriterLimiter.Ack()
157153
_ = this.rawWriter.Close()
158-
fsutils.WriterLimiter.Release()
159154
_ = fsutils.Remove(path)
160155
return err
161156
}
162157

163-
fsutils.WriterLimiter.Ack()
164158
err = this.rawWriter.Close()
165-
fsutils.WriterLimiter.Release()
166159
if err != nil {
167160
_ = fsutils.Remove(path)
168161
} else if strings.HasSuffix(path, FileTmpSuffix) {
@@ -181,9 +174,7 @@ func (this *FileWriter) Discard() error {
181174
this.endFunc()
182175
})
183176

184-
fsutils.WriterLimiter.Ack()
185177
_ = this.rawWriter.Close()
186-
fsutils.WriterLimiter.Release()
187178

188179
err := fsutils.Remove(this.rawWriter.Name())
189180
return err
@@ -211,9 +202,7 @@ func (this *FileWriter) ItemType() ItemType {
211202
}
212203

213204
func (this *FileWriter) write(data []byte) (n int, err error) {
214-
fsutils.WriterLimiter.Ack()
215205
n, err = this.rawWriter.Write(data)
216-
fsutils.WriterLimiter.Release()
217206
this.bodySize += int64(n)
218207

219208
if this.maxSize > 0 && this.bodySize > this.maxSize {
@@ -227,5 +216,6 @@ func (this *FileWriter) write(data []byte) (n int, err error) {
227216
if err != nil {
228217
_ = this.Discard()
229218
}
219+
230220
return
231221
}

0 commit comments

Comments
 (0)