Skip to content
This repository was archived by the owner on Oct 31, 2023. It is now read-only.

Commit 9118ce3

Browse files
committed
move reproduce to mine.py
1 parent b23c1dd commit 9118ce3

File tree

10 files changed

+204
-478
lines changed

10 files changed

+204
-478
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ test:
185185

186186
test2:
187187
python -m cc_net mine --config config/test_segment.json
188-
python -m cc_net mine --config config/test_segment.json --metadata test_data2/mined
188+
python -m cc_net mine --config config/test_segment.json -p fetch_metadata split
189189
diff \
190190
<(zcat test_data/mined/2019-09/fr_head_0000.json.gz | jq -c 'select(.cc_segment == "crawl-data/CC-MAIN-2019-09/segments/1550247479101.30/wet/CC-MAIN-20190215183319-20190215205319-00000.warc.wet.gz") | {url, perplexity}' | sort) \
191191
<(zcat test_data2/mined/2019-09/CC-MAIN-20190215183319-20190215205319-00000.json.gz | jq -c 'select(.bucket == "head" and .language == "fr") | {url, perplexity}' | sort) \

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,16 @@ You can peak at those files using UNIX tools `zcat` and [`jq`](https://stedolan.
126126

127127
By contributing to `cc_net`, you agree that your contributions will be licensed
128128
under the LICENSE file in the root directory of this source tree.
129+
130+
131+
## Output
132+
133+
```
134+
data/mined_by_segment/{dump}
135+
seg_000_000.json.gz
136+
seg_000_001.json.gz
137+
seg_000_002.json.gz
138+
139+
data/reproduce_by_lang/{dump}
140+
en_head_0000.json.gz
141+
```

cc_net/__main__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
import func_argparse
1010

1111
import cc_net.mine
12-
import cc_net.minify
1312

1413

1514
def main():
1615
parser = func_argparse.multi_argparser(
17-
mine=cc_net.mine.get_main_parser(),
18-
reproduce=func_argparse.func_argparser(cc_net.minify.reproduce),
16+
mine=cc_net.mine.get_main_parser("mine"),
17+
reproduce=cc_net.mine.get_main_parser("reproduce"),
1918
)
2019
func_argparse.parse_and_call(parser)
2120

cc_net/execution.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919

2020
class Executor(Protocol):
21-
def __call__(self, function: Callable[..., Optional[str]], *args: Iterable) -> None:
21+
def __call__(self, function: Callable[..., str], *args: Iterable) -> None:
2222
...
2323

2424

@@ -88,7 +88,7 @@ def __call__(self, *args, **kwargs):
8888
**options,
8989
)
9090

91-
def submit_and_wait(function: Callable[..., Optional[str]], *args: Iterable):
91+
def submit_and_wait(function: Callable[..., str], *args: Iterable):
9292
f_name = function.__name__
9393

9494
assert len(args) > 0, f"No arguments passed to {f_name}"
@@ -105,15 +105,12 @@ def submit_and_wait(function: Callable[..., Optional[str]], *args: Iterable):
105105
print(f"Started {f_name} in job array {job_array_id} ({len(jobs)} jobs).")
106106
for job in submitit.helpers.as_completed(jobs):
107107
done += 1
108-
print(f"Finished job {job.job_id} ({done} / {total}).")
109108
e = job.exception()
110109
if not e:
111-
message = job.result()
112-
if message is not None:
113-
print(message)
110+
print(f"Finished job {job.job_id} ({done} / {total}).", job.result())
114111
continue
115112

116-
print(f"Failed job {job.job_id}:", e)
113+
print(f"Failed job {job.job_id} ({done} / {total}):", e)
117114
failed_jobs.append(job)
118115

119116
if failed_jobs:
@@ -137,7 +134,7 @@ def debug_executor(function: Callable[..., Optional[str]], *args: Iterable) -> N
137134
message = function(*x)
138135
except Exception:
139136
try:
140-
import ipdb as pdb
137+
import ipdb as pdb # type: ignore
141138
except ImportError:
142139
import pdb # type: ignore
143140
import traceback

cc_net/mine.py

Lines changed: 87 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class Config(NamedTuple):
5151
5252
config_name
5353
dump: CC dump id
54-
output_dir: where to write the dataset
54+
output_dir: working directory
55+
mined_dir: name of the destination folder, full path will be {ouput_dir}/{mined_dir}/{dump_id}
5556
execution: chose how to parallelize the execution
5657
num_shards: number of shards to split the dump
5758
num_segments_per_shard: allow to download a small portion of CC (eg for tests)
@@ -74,6 +75,7 @@ class Config(NamedTuple):
7475
config_name: str = "base"
7576
dump: str = "2017-51"
7677
output_dir: Path = Path("data")
78+
mined_dir: str = "mined"
7779
execution: str = "slurm"
7880
num_shards: int = 1600
7981
num_segments_per_shard: int = -1
@@ -135,7 +137,7 @@ def from_json(cls, json_file: Path) -> "Config":
135137

136138
@property
137139
def will_split(self) -> bool:
138-
return "split" in self.pipeline or "split_by_segment" in self.pipeline
140+
return "split_by_lang" in self.pipeline or "split_by_segment" in self.pipeline
139141

140142
def get_lm_languages(self) -> Sequence[str]:
141143
if self.lm_languages is not None:
@@ -154,9 +156,18 @@ def _get_dir(self, name: str, regroup: bool = False) -> Path:
154156
return self.output_dir / f"{name}_split" / self.dump
155157
return self.output_dir / name / self.dump
156158

159+
def get_mined_dir(self) -> Path:
160+
return self._get_dir(self.mined_dir)
161+
157162

158163
BASE_CONFIG = Config()
159164

165+
BYLANG_CONFIG = Config(
166+
config_name="by_lang",
167+
mined_dir="mined_by_lang",
168+
pipeline=list(BASE_CONFIG.pipeline[:-1]) + ["split_by_lang"],
169+
)
170+
160171
TEST_CONFIG = BASE_CONFIG._replace(
161172
config_name="test",
162173
dump="2019-09",
@@ -174,6 +185,7 @@ def _get_dir(self, name: str, regroup: bool = False) -> Path:
174185

175186
PREDEF_CONFIGS = {
176187
"base": BASE_CONFIG,
188+
"by_lang": BYLANG_CONFIG,
177189
"test": TEST_CONFIG,
178190
"test_slurm": TEST_CONFIG._replace(execution="slurm,partition=dev"),
179191
"debug": TEST_CONFIG._replace(config_name="debug", mine_num_processes=0),
@@ -248,13 +260,12 @@ def _hashes_shard(conf: Config, shard: int, output: Path):
248260

249261
def mine(conf: Config) -> List[Path]:
250262
"""Remove dups, run LID and LMs, and split by lang and quality."""
263+
mined_dir = conf.get_mined_dir() / conf.dump
251264
if conf.will_split:
252265
# Give a directories when splitting
253-
mined_dir = conf.output_dir / "mined_split" / conf.dump
254266
outputs = [mined_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
255267
else:
256268
# Files otherwise
257-
mined_dir = conf.output_dir / "mined" / conf.dump
258269
outputs = [
259270
mined_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
260271
]
@@ -370,7 +381,7 @@ def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> s
370381
steps["drop"] = perplexity.DropKeys(tok_field)
371382

372383
pattern = str(tmp_output / "{language}_{bucket}.json.gz")
373-
steps["split"] = jsonql.split(pattern=str(pattern), mkdir=True)
384+
steps["split_by_lang"] = jsonql.split(pattern=str(pattern), mkdir=True)
374385

375386
steps["split_by_segment"] = jsonql.split(
376387
split_fn=lambda doc: _get_segment(tmp_output, doc), mkdir=True
@@ -391,54 +402,6 @@ def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> s
391402
return f"Mined {output}"
392403

393404

394-
def reproduce(conf: Config) -> List[Path]:
395-
reproduce_dir = conf._get_dir("reproduce")
396-
reproduce_dir.mkdir(parents=True, exist_ok=True)
397-
if conf.will_split:
398-
# Givedirectories en splitting
399-
outputs = [reproduce_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
400-
else:
401-
# Files otherwise
402-
outputs = [
403-
reproduce_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
404-
]
405-
missing_outputs = [(shard, o) for shard, o in enumerate(outputs) if not o.exists()]
406-
if not missing_outputs:
407-
return outputs
408-
409-
ex = conf.get_executor("reproduce", timeout_hour=2, mem_gb=2, cpus=2)
410-
ex(_reproduce_shard, repeat(conf), *_transpose(missing_outputs))
411-
return outputs
412-
413-
414-
def _reproduce_shard(conf: Config, shard: int, output: Path) -> str:
415-
from cc_net import transpose
416-
417-
assert conf.metadata is not None
418-
tmp_output = tmp(output)
419-
cc = process_wet_file.CCShardReader(
420-
conf.dump,
421-
shard,
422-
num_shards=conf.num_shards,
423-
num_segments_per_shard=conf.num_segments_per_shard,
424-
cache_dir=conf.cache_dir,
425-
)
426-
427-
unminifier = transpose.LinearUnminifier(conf.metadata / conf.dump)
428-
# TODO: we should look at the conf to see how to split
429-
pipeline: List[jsonql.Transformer] = [unminifier]
430-
431-
if conf.will_split:
432-
pattern = str(tmp_output / "{language}_{bucket}.json.gz")
433-
pipeline.append(jsonql.split(pattern=str(pattern), mkdir=True))
434-
435-
jsonql.run_pipes(
436-
*pipeline, file=cc, output=tmp_output if not conf.will_split else None
437-
)
438-
tmp_output.rename(output)
439-
return f"Unminified {output}"
440-
441-
442405
def regroup(conf: Config, before: Callable[[Config], List[Path]], dirname: str) -> Path:
443406
"""Reshards each language/quality after 'mine'."""
444407
mined_dir = conf.output_dir / f"{dirname}_split" / conf.dump
@@ -450,7 +413,6 @@ def regroup(conf: Config, before: Callable[[Config], List[Path]], dirname: str)
450413
print(f"No files found in {mined_dir} for regroup. Exiting.")
451414
return regroup_dir
452415

453-
# check that mining is over.
454416
all_files = [f for d in before(conf) for f in d.glob("*.json.gz")]
455417
assert all_files, f"No files found inside mined dir: {mined_dir}"
456418

@@ -535,7 +497,7 @@ def move_segments(conf: Config, first_stage: Callable, dirname: str) -> Path:
535497
regroup_dir.mkdir(exist_ok=True)
536498
ex = conf.get_executor(f"moveseg_{conf.dump}", mem_gb=1, timeout_hour=1, cpus=2)
537499

538-
def _move_segments(subdir: Path, regroup_dir: Path) -> Optional[str]:
500+
def _move_segments(subdir: Path, regroup_dir: Path) -> str:
539501
n = 0
540502
for f in subdir.iterdir():
541503
if not f.is_file() or f.is_symlink():
@@ -549,7 +511,7 @@ def _move_segments(subdir: Path, regroup_dir: Path) -> Optional[str]:
549511
f.symlink_to(target)
550512

551513
if n == 0:
552-
return None
514+
return ""
553515

554516
return f"Moved {n} .json.gz files from {subdir} to {regroup_dir}"
555517

@@ -615,19 +577,70 @@ def dump(x):
615577

616578

617579
def get_main_parser() -> ArgumentParser:
618-
# Generates the 'main' parser by patching a 'Config' parser
619-
p = func_argparse.func_argparser(Config)
580+
def _parser(entry_point: str) -> ArgumentParser:
581+
# Generates the 'main' parser by patching a 'Config' parser
582+
p = func_argparse.func_argparser(Config)
583+
584+
# Override defaults value to None, so we know what was set by the user.
585+
# Note that it will keep the original default values in the help message.
586+
p.set_defaults(**{f: None for f in Config._fields})
587+
588+
p.add_argument("--config", type=str, default="base")
589+
p.set_defaults(__command=main)
590+
p.set_defaults(entry_point=entry_point)
591+
return p
592+
593+
return func_argparse.multi_argparser(
594+
mine=_parser("mine"),
595+
# TODO: we should hide parameters not used in `reproduce`
596+
reproduce=_parser("reproduce"),
597+
)
620598

621-
# Override defaults value to None, so we know what was set by the user.
622-
# Note that it will keep the original default values in the help message.
623-
p.set_defaults(**{f: None for f in Config._fields})
624599

625-
p.add_argument("--config", type=str, default="base")
626-
p.set_defaults(__command=main)
627-
return p
600+
def reproduce(conf: Config) -> List[Path]:
601+
reproduce_dir = conf._get_dir("reproduce")
602+
reproduce_dir.mkdir(parents=True, exist_ok=True)
603+
if conf.will_split:
604+
# Givedirectories en splitting
605+
outputs = [reproduce_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
606+
else:
607+
# Files otherwise
608+
outputs = [
609+
reproduce_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
610+
]
611+
missing_outputs = [(shard, o) for shard, o in enumerate(outputs) if not o.exists()]
612+
if not missing_outputs:
613+
return outputs
628614

615+
ex = conf.get_executor("reproduce", timeout_hour=2, mem_gb=2, cpus=2)
616+
ex(_reproduce_shard, repeat(conf), *_transpose(missing_outputs))
617+
return outputs
629618

630-
def main(config: str = "base", **config_as_dict: Any) -> None:
619+
620+
def _reproduce_shard(conf: Config, shard: int, output: Path) -> str:
621+
metadata = conf.metadata
622+
if metadata is None and (conf.output_dir / "mined").exists():
623+
# TODO: better default
624+
metadata = conf.output_dir / "mined"
625+
print(f"Will use {metadata} as metadata source")
626+
assert metadata is not None, "Need to set 'metadata' for reproduce"
627+
cc = conf.get_cc_shard(shard)
628+
629+
unminifier = minify.MetadataFetcher(metadata / conf.dump)
630+
# TODO: we should look at the conf to see how to split
631+
pipeline: List[jsonql.Transformer] = [unminifier]
632+
633+
tmp_output = tmp(output)
634+
if conf.will_split:
635+
pattern = str(tmp(output) / "{language}_{bucket}.json.gz")
636+
pipeline.append(jsonql.split(pattern=str(pattern), mkdir=True))
637+
638+
jsonql.run_pipes(*pipeline, file=cc, output=None if conf.will_split else tmp_output)
639+
tmp_output.rename(output)
640+
return f"Unminified {output}"
641+
642+
643+
def main(entry_point: str, config: str = "base", **config_as_dict: Any) -> None:
631644
# Use the given 'config' as default value.
632645
config_base = config
633646
if config_base in PREDEF_CONFIGS:
@@ -640,32 +653,22 @@ def main(config: str = "base", **config_as_dict: Any) -> None:
640653
f"Choose from ({', '.join(PREDEF_CONFIGS)}) or give an existing .json file."
641654
)
642655
conf = conf._replace(**{k: v for (k, v) in config_as_dict.items() if v is not None})
643-
print("Will run mine.py with the following config:", conf)
644-
645-
# Decide if we need to mine or if we have metadata available
646-
647-
if conf.metadata:
648-
conf = conf._replace(pipeline=["split"])
649-
# this is not very clean. We should either:
650-
# - move back to the reproduce command
651-
# - add an 'unminify' step that read conf.metadata
652656

653-
print(f"Will use pre-computed metadata from {conf.metadata}")
654-
first_stage = reproduce
655-
dir_name = "reproduce"
656-
657-
else:
658-
first_stage = mine
659-
dir_name = "mined"
657+
print(f"Will run cc_net.mine.{entry_point} with the following config:", conf)
658+
first_stage = {"mine": mine, "reproduce": reproduce}[entry_point]
659+
dir_name = entry_point
660660
regroup_dir = conf._get_dir(dir_name, regroup=True)
661-
if "split" in conf.pipeline:
662-
# Only regroup if we split the shards.
661+
662+
if "split_by_lang" in conf.pipeline:
663+
# Only try regrouping if we split the shards.
663664
regroup(conf, first_stage, dir_name)
664665
elif "split_by_segment" in conf.pipeline:
665666
# If we split by segment then regrouping is trivial, since segments appear in only one shard.
666667
move_segments(conf, first_stage, dir_name)
668+
else:
669+
first_stage(conf)
667670

668-
if config_base == "test":
671+
if conf.config_name == "test":
669672
_validate_test(conf, regroup_dir)
670673

671674

0 commit comments

Comments
 (0)