Skip to content

Commit caddb10

Browse files
committed
Big bang.
0 parents  commit caddb10

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed

.gitignore

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Compiled Object files, Static and Dynamic libs (Shared Objects)
2+
*.o
3+
*.a
4+
*.so
5+
6+
# Folders
7+
_obj
8+
_test
9+
10+
# Architecture specific extensions/prefixes
11+
*.[568vq]
12+
[568vq].out
13+
14+
*.cgo1.go
15+
*.cgo2.c
16+
_cgo_defun.c
17+
_cgo_gotypes.go
18+
_cgo_export.*
19+
20+
_testmain.go
21+
22+
*.exe
23+
*.test
24+
*.prof

main.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
"log"
9+
"math"
10+
"os"
11+
"os/exec"
12+
"path"
13+
"path/filepath"
14+
"strings"
15+
"sync"
16+
)
17+
18+
var tempDir, _ = ioutil.TempDir("", "pdoop")
19+
var parallel = flag.Int("p", 10, "parallel")
20+
var mergeMode = flag.Bool("m", false, "merge as a single file")
21+
22+
type HDFS struct {
23+
hadoopHome string
24+
hadoopCmd string
25+
fsName string
26+
fsUgi string
27+
}
28+
29+
func NewHDFS() HDFS {
30+
hadoopHome := os.Getenv("HADOOP_HOME")
31+
return HDFS{
32+
hadoopHome: hadoopHome,
33+
hadoopCmd: path.Join(hadoopHome, "bin", "hadoop"),
34+
}
35+
}
36+
37+
func (s HDFS) Ls(inputPath string) []string {
38+
log.Printf("ls: %s\n", inputPath)
39+
output := s.Exec("fs", "-ls", inputPath)
40+
outputSplit := strings.Split(output, "\n")
41+
files := []string{}
42+
for _, line := range outputSplit {
43+
line = strings.TrimSpace(line)
44+
if !strings.HasPrefix(line, "-") || line == "" {
45+
continue
46+
}
47+
split := strings.Fields(line)
48+
files = append(files, split[len(split)-1])
49+
}
50+
return files
51+
}
52+
53+
func (s HDFS) Get(remote string, local string) {
54+
s.Exec("fs", "-get", remote, local)
55+
}
56+
57+
func (s HDFS) Gets(remoteFiles []string, local string) {
58+
for _, f := range remoteFiles {
59+
s.Get(f, local)
60+
}
61+
}
62+
63+
func (s HDFS) Exec(subCmd string, args ...string) string {
64+
jobConf := []string{}
65+
fullCmd := []string{
66+
subCmd,
67+
}
68+
for _, args := range [][]string{jobConf, args} {
69+
fullCmd = append(fullCmd, args...)
70+
}
71+
cmd := exec.Command(s.hadoopCmd, fullCmd...)
72+
stdout, _ := cmd.CombinedOutput()
73+
return string(stdout)
74+
}
75+
76+
func downloadWithChan(hdfs HDFS, remoteFiles []string, localDir string, ch chan string) {
77+
for _, f := range remoteFiles {
78+
hdfs.Get(f, localDir)
79+
localBase := filepath.Base(f)
80+
ch <- path.Join(localDir, localBase)
81+
}
82+
}
83+
84+
func chunk(lst []string, chunkNum int) (output [][]string) {
85+
chunkSize := int(math.Ceil(float64(len(lst)) / float64(chunkNum)))
86+
for i := 0; i < chunkNum; i++ {
87+
start := i * chunkSize
88+
end := start + chunkSize
89+
if start >= len(lst) {
90+
break
91+
}
92+
if end > len(lst) {
93+
end = len(lst)
94+
}
95+
output = append(output, lst[start:end])
96+
}
97+
return output
98+
}
99+
100+
func checkDir(path string) error {
101+
fileInfo, err := os.Stat(path)
102+
if err != nil {
103+
return err
104+
}
105+
if !fileInfo.IsDir() {
106+
return fmt.Errorf("path is not a directory: %s", path)
107+
}
108+
return nil
109+
}
110+
111+
func main() {
112+
flag.Parse()
113+
if flag.NArg() != 2 {
114+
fmt.Println("Two arguments (input and output) are required!")
115+
flag.Usage()
116+
os.Exit(1)
117+
}
118+
input := flag.Arg(0)
119+
output := flag.Arg(1)
120+
log.Println("input: ", input)
121+
log.Println("output: ", output)
122+
if !*mergeMode {
123+
err := checkDir(output)
124+
if err != nil {
125+
log.Fatal(err)
126+
os.Exit(1)
127+
}
128+
}
129+
hdfs := NewHDFS()
130+
files := hdfs.Ls(input)
131+
slices := chunk(files, *parallel)
132+
ch := make(chan string, len(files))
133+
var wg sync.WaitGroup
134+
if *mergeMode {
135+
log.Println("tempDir: ", tempDir)
136+
defer os.RemoveAll(tempDir)
137+
for _, files := range slices {
138+
wg.Add(1)
139+
go func(files []string) {
140+
defer wg.Done()
141+
downloadWithChan(hdfs, files, tempDir, ch)
142+
}(files)
143+
}
144+
} else {
145+
for _, files := range slices {
146+
wg.Add(1)
147+
go func(files []string) {
148+
defer wg.Done()
149+
hdfs.Gets(files, output)
150+
}(files)
151+
}
152+
153+
}
154+
if *mergeMode {
155+
go func() {
156+
wg.Wait()
157+
close(ch)
158+
}()
159+
out, _ := os.Create(output)
160+
for f := range ch {
161+
in, _ := os.Open(f)
162+
if _, err := io.Copy(out, in); err != nil {
163+
log.Fatal(err)
164+
}
165+
in.Close()
166+
os.Remove(f)
167+
}
168+
out.Close()
169+
} else {
170+
wg.Wait()
171+
close(ch)
172+
}
173+
}

main_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"reflect"
7+
"sort"
8+
"testing"
9+
)
10+
11+
func generateHDFSFiles(n int) (output []string) {
12+
for i := 0; i < n; i++ {
13+
output = append(output, fmt.Sprintf("part-%d", i))
14+
}
15+
return output
16+
}
17+
18+
func mergeSlices(input [][]string) (output []string) {
19+
for _, slice := range input {
20+
output = append(output, slice...)
21+
}
22+
return output
23+
}
24+
25+
func TestChunk(t *testing.T) {
26+
const (
27+
testTimes = 1000
28+
maxN = 10000
29+
maxChunkNum = 10000
30+
)
31+
for i := 0; i < testTimes; i++ {
32+
lst := generateHDFSFiles(rand.Intn(maxN))
33+
chunkNum := rand.Intn(maxChunkNum)
34+
slices := chunk(lst, chunkNum)
35+
merged := mergeSlices(slices)
36+
if len(merged) != len(lst) {
37+
t.Errorf("chunks merged length doesn't match, got %d and expect %d", len(merged), len(lst))
38+
}
39+
sort.Sort(sort.StringSlice(merged))
40+
sort.Sort(sort.StringSlice(lst))
41+
if !reflect.DeepEqual(lst, merged) {
42+
t.Errorf("chunks merged doesn't equal original list")
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)