Skip to content

Commit 3203c07

Browse files
authored
Merge pull request kubernetes-sigs#1605 from vincepri/flock-nb
🐛 Use non blocking file locking for flock library
2 parents 3c78540 + f90348f commit 3203c07

File tree

3 files changed

+49
-13
lines changed

3 files changed

+49
-13
lines changed

pkg/internal/flock/errors.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flock
18+
19+
import "errors"
20+
21+
var (
22+
// ErrAlreadyLocked is returned when the file is already locked.
23+
ErrAlreadyLocked = errors.New("the file is already locked")
24+
)

pkg/internal/flock/flock_unix.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,30 @@ limitations under the License.
1818

1919
package flock
2020

21-
import "golang.org/x/sys/unix"
21+
import (
22+
"errors"
23+
"fmt"
24+
"os"
25+
26+
"golang.org/x/sys/unix"
27+
)
2228

2329
// Acquire acquires a lock on a file for the duration of the process. This method
2430
// is reentrant.
2531
func Acquire(path string) error {
2632
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
2733
if err != nil {
34+
if errors.Is(err, os.ErrExist) {
35+
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
36+
}
2837
return err
2938
}
3039

3140
// We don't need to close the fd since we should hold
3241
// it until the process exits.
33-
34-
return unix.Flock(fd, unix.LOCK_EX)
42+
err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX)
43+
if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB.
44+
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
45+
}
46+
return err
3547
}

pkg/internal/testing/addr/manager.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package addr
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"io/fs"
2223
"net"
@@ -31,7 +32,7 @@ import (
3132
// TODO(directxman12): interface / release functionality for external port managers
3233

3334
const (
34-
portReserveTime = 10 * time.Minute
35+
portReserveTime = 2 * time.Minute
3536
portConflictRetry = 100
3637
portFilePrefix = "port-"
3738
)
@@ -76,7 +77,8 @@ func (c *portCache) add(port int) (bool, error) {
7677
return false, err
7778
}
7879
// Try allocating new port, by acquiring a file.
79-
if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) {
80+
path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
81+
if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
8082
return false, nil
8183
} else if err != nil {
8284
return false, err
@@ -86,22 +88,19 @@ func (c *portCache) add(port int) (bool, error) {
8688

8789
var cache = &portCache{}
8890

89-
func suggest(listenHost string) (int, string, error) {
91+
func suggest(listenHost string) (*net.TCPListener, int, string, error) {
9092
if listenHost == "" {
9193
listenHost = "localhost"
9294
}
9395
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
9496
if err != nil {
95-
return -1, "", err
97+
return nil, -1, "", err
9698
}
9799
l, err := net.ListenTCP("tcp", addr)
98100
if err != nil {
99-
return -1, "", err
101+
return nil, -1, "", err
100102
}
101-
if err := l.Close(); err != nil {
102-
return -1, "", err
103-
}
104-
return l.Addr().(*net.TCPAddr).Port,
103+
return l, l.Addr().(*net.TCPAddr).Port,
105104
addr.IP.String(),
106105
nil
107106
}
@@ -112,10 +111,11 @@ func suggest(listenHost string) (int, string, error) {
112111
// allocated within 1 minute.
113112
func Suggest(listenHost string) (int, string, error) {
114113
for i := 0; i < portConflictRetry; i++ {
115-
port, resolvedHost, err := suggest(listenHost)
114+
listener, port, resolvedHost, err := suggest(listenHost)
116115
if err != nil {
117116
return -1, "", err
118117
}
118+
defer listener.Close()
119119
if ok, err := cache.add(port); ok {
120120
return port, resolvedHost, nil
121121
} else if err != nil {

0 commit comments

Comments
 (0)