Skip to content

Commit 5058bbd

Browse files
authored
Merge pull request #4 from yankooo/dev
impl pool
2 parents 4345953 + 73e1c0d commit 5058bbd

File tree

13 files changed

+845
-1
lines changed

13 files changed

+845
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea/

.travis.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
language: go
2+
3+
go:
4+
- 1.12.x
5+
6+
before_install:
7+
- go get -t -v ./...
8+
9+
script:
10+
- go test -race -coverprofile=coverage.txt -covermode=atomic
11+
12+
after_success:
13+
- bash <(curl -s https://codecov.io/bash)

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2020 Michael Liu
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,75 @@
11
# wasps
2-
A lightweight Goroutine pool for Go
2+
3+
[![Build Status](https://travis-ci.com/yankooo/wasps.svg?branch=master)](https://travis-ci.com/yankooo/wasps) [![Go Report Card](https://goreportcard.com/badge/github.com/yankooo/wasps)](https://goreportcard.com/report/github.com/yankooo/wasps) [![Codecov](https://img.shields.io/codecov/c/github/yankooo/wasps/master)](https://codecov.io/gh/yankooo/wasps) [![Doc for wasps](https://img.shields.io/badge/go.dev-doc-007d9c?style=flat&logo=appveyor)](https://pkg.go.dev/github.com/yankooo/wasps?tab=doc)
4+
5+
English | [中文](README_ZH.md)
6+
7+
## Introduction
8+
9+
`wasps` is a lightweight goroutine pool that implements scheduling management for multiple goroutines.
10+
11+
## Features:
12+
13+
- Automatic scheduling goroutine.
14+
- Provides commonly-used interfaces: task submission, getting the number of running goroutines, dynamically adjusting the size of the pool, and releasing the pool.
15+
- Provide callback type goroutine pool, serialization work goroutine pool, custom work goroutine pool.
16+
- Support custom work goroutine, support panic processing of task goroutine, and custom pass parameter of closure function.
17+
- Asynchronous mechanism.
18+
19+
## Docs
20+
21+
https://godoc.org/github.com/yankooo/wasps
22+
23+
## Installation
24+
25+
``` go
26+
go get github.com/yankooo/wasps
27+
```
28+
29+
## Use
30+
``` go
31+
package main
32+
33+
import (
34+
"context"
35+
"fmt"
36+
"github.com/yankooo/wasps"
37+
"log"
38+
"sync"
39+
"time"
40+
)
41+
42+
func main() {
43+
pool := wasps.NewCallback(5)
44+
defer func() {
45+
pool.Release()
46+
}()
47+
48+
var num = 10
49+
ctx, _ := context.WithTimeout(context.TODO(), 1*time.Second)
50+
51+
var wg sync.WaitGroup
52+
53+
wg.Add(3)
54+
pool.SubmitWithContext(ctx, func(args ...interface{}) {
55+
defer wg.Done()
56+
num := args[0].(int)
57+
log.Printf("first submit %d", num*2)
58+
}, wasps.WithArgs(num))
59+
60+
pool.Submit(func(args ...interface{}) {
61+
defer wg.Done()
62+
num := args[0].(int)
63+
log.Printf("second submit %d", num)
64+
}, wasps.WithArgs(num), wasps.WithRecoverFn(func(r interface{}) { fmt.Printf("catch panic: %+v\n", r) }))
65+
66+
num = 200
67+
pool.Submit(func(args ...interface{}) {
68+
defer wg.Done()
69+
log.Printf("third submit %d", num)
70+
})
71+
72+
wg.Wait()
73+
}
74+
```
75+

README_ZH.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# wasps
2+
3+
[![Build Status](https://travis-ci.com/yankooo/wasps.svg?branch=master)](https://travis-ci.com/yankooo/wasps) [![Go Report Card](https://goreportcard.com/badge/github.com/yankooo/wasps)](https://goreportcard.com/report/github.com/yankooo/wasps) [![Codecov](https://img.shields.io/codecov/c/github/yankooo/wasps/master)](https://codecov.io/gh/yankooo/wasps) [![Doc for wasps](https://img.shields.io/badge/go.dev-doc-007d9c?style=flat&logo=appveyor)](https://pkg.go.dev/github.com/yankooo/wasps?tab=doc)
4+
5+
[英文](README.md) | 中文
6+
7+
## 简介
8+
9+
`wasps`是一个轻量级的 goroutine 池,实现了对多个 goroutine 的调度管理。
10+
11+
## 功能
12+
13+
- 自动调度goroutine。
14+
- 提供了常用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool。
15+
- 提供了回调类型的协程池、串行化工作的协程池、自定义工作协程的协程池。
16+
- 支持自定义工作协程,支持任务协程的panic处理以及闭包函数的自定义传参。
17+
- 异步机制
18+
19+
## 文档
20+
21+
https://godoc.org/github.com/yankooo/wasps
22+
23+
## 安装
24+
25+
``` go
26+
go get github.com/yankooo/wasps
27+
```
28+
29+
## 使用
30+
``` go
31+
package main
32+
33+
import (
34+
"context"
35+
"fmt"
36+
"github.com/yankooo/wasps"
37+
"log"
38+
"sync"
39+
"time"
40+
)
41+
42+
func main() {
43+
pool := wasps.NewCallback(5)
44+
defer func() {
45+
pool.Release()
46+
}()
47+
48+
var num = 10
49+
ctx, _ := context.WithTimeout(context.TODO(), 1*time.Second)
50+
51+
var wg sync.WaitGroup
52+
53+
wg.Add(3)
54+
pool.SubmitWithContext(ctx, func(args ...interface{}) {
55+
defer wg.Done()
56+
num := args[0].(int)
57+
log.Printf("first submit %d", num*2)
58+
}, wasps.WithArgs(num))
59+
60+
pool.Submit(func(args ...interface{}) {
61+
defer wg.Done()
62+
num := args[0].(int)
63+
log.Printf("second submit %d", num)
64+
}, wasps.WithArgs(num), wasps.WithRecoverFn(func(r interface{}) { fmt.Printf("catch panic: %+v\n", r) }))
65+
66+
num = 200
67+
pool.Submit(func(args ...interface{}) {
68+
defer wg.Done()
69+
log.Printf("third submit %d", num)
70+
})
71+
72+
wg.Wait()
73+
}
74+
```

example/custom_worker.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"github.com/yankooo/wasps"
6+
"log"
7+
"time"
8+
)
9+
10+
type customWorker struct {
11+
job chan *wasps.Job
12+
close chan struct{}
13+
}
14+
15+
type customTypeFn func() error
16+
17+
func (c *customWorker) Do(job *wasps.Job) {
18+
f := job.Task.(customTypeFn)
19+
_ = f()
20+
}
21+
22+
func (c *customWorker) JobChan() chan *wasps.Job {
23+
return c.job
24+
}
25+
26+
func (c *customWorker) StopChan() chan struct{} {
27+
return c.close
28+
}
29+
30+
func customWorkerExample() {
31+
pool := wasps.New(5, func() wasps.Worker {
32+
return &customWorker{
33+
job: make(chan *wasps.Job),
34+
close: make(chan struct{}),
35+
}
36+
})
37+
defer pool.Release()
38+
39+
ctx, _ := context.WithTimeout(context.TODO(), time.Second*2)
40+
var task customTypeFn = func() error {
41+
panic("custome")
42+
return nil
43+
}
44+
45+
pool.SubmitWithContext(ctx, task, wasps.WithRecoverFn(func(r interface{}) {
46+
log.Printf("catch panic : %+v", r)
47+
}))
48+
49+
time.Sleep(time.Second * 2)
50+
}

example/simple.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/yankooo/wasps"
7+
"log"
8+
"sync"
9+
"time"
10+
)
11+
12+
func main() {
13+
pool := wasps.NewCallback(5)
14+
defer func() {
15+
pool.Release()
16+
}()
17+
18+
var num = 10
19+
ctx, _ := context.WithTimeout(context.TODO(), 1*time.Second)
20+
21+
var wg sync.WaitGroup
22+
23+
wg.Add(3)
24+
pool.SubmitWithContext(ctx, func(args ...interface{}) {
25+
defer wg.Done()
26+
num := args[0].(int)
27+
log.Printf("first submit %d", num*2)
28+
}, wasps.WithArgs(num))
29+
30+
pool.Submit(func(args ...interface{}) {
31+
defer wg.Done()
32+
num := args[0].(int)
33+
log.Printf("second submit %d", num)
34+
}, wasps.WithArgs(num), wasps.WithRecoverFn(func(r interface{}) { fmt.Printf("catch panic: %+v\n", r) }))
35+
36+
num = 200
37+
pool.Submit(func(args ...interface{}) {
38+
defer wg.Done()
39+
log.Printf("third submit %d", num)
40+
})
41+
42+
wg.Wait()
43+
}

go.mod

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module github.com/yankooo/wasps
2+
3+
go 1.12
4+
5+
require (
6+
github.com/davecgh/go-spew v1.1.1 // indirect
7+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
8+
github.com/stretchr/testify v1.6.1
9+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
10+
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect
11+
)

go.sum

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
4+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
5+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
6+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
7+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8+
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
9+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
10+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
11+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
12+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
13+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
14+
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo=
15+
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

option.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package wasps
2+
3+
type taskOptions struct {
4+
RecoverFn func(r interface{})
5+
Args []interface{}
6+
}
7+
8+
// TaskOption configures how we set up the connection.
9+
type TaskOption interface {
10+
apply(*taskOptions)
11+
}
12+
13+
// funcTaskOption wraps a function that modifies taskOptions into an
14+
// implementation of the TaskOption interface.
15+
type funcTaskOption struct {
16+
f func(*taskOptions)
17+
}
18+
19+
func (fdo *funcTaskOption) apply(do *taskOptions) {
20+
fdo.f(do)
21+
}
22+
23+
func newFuncTaskOption(f func(*taskOptions)) *funcTaskOption {
24+
return &funcTaskOption{
25+
f: f,
26+
}
27+
}
28+
29+
// WithRecoverFn returns task option for recover to catch panic
30+
func WithRecoverFn(f func(r interface{})) TaskOption {
31+
return newFuncTaskOption(func(o *taskOptions) {
32+
o.RecoverFn = f
33+
})
34+
}
35+
36+
// WithArgs returns task option for callback func args
37+
func WithArgs(args ...interface{}) TaskOption {
38+
return newFuncTaskOption(func(o *taskOptions) {
39+
o.Args = args
40+
})
41+
}
42+
43+
var defaultTaskOptions = &taskOptions{
44+
RecoverFn: func(r interface{}) {},
45+
}
46+
47+
type poolOptions struct {
48+
}
49+
50+
// PoolOption configures how we set up the connection.
51+
type PoolOption interface {
52+
apply(*poolOptions)
53+
}
54+
55+
// funcPoolOption wraps a function that modifies poolOptions into an
56+
// implementation of the PoolOption interface.
57+
type funcPoolOption struct {
58+
f func(*poolOptions)
59+
}
60+
61+
func (fdo *funcPoolOption) apply(do *poolOptions) {
62+
fdo.f(do)
63+
}
64+
65+
func newFuncPoolOption(f func(*poolOptions)) *funcPoolOption {
66+
return &funcPoolOption{
67+
f: f,
68+
}
69+
}
70+
71+
var defaultPoolOptions = &poolOptions{}

0 commit comments

Comments
 (0)