Skip to content
This repository was archived by the owner on Oct 31, 2023. It is now read-only.

Commit 6e11403

Browse files
committed
implement reproduce as another mining pipeline.
1 parent 9118ce3 commit 6e11403

File tree

5 files changed

+64
-15
lines changed

5 files changed

+64
-15
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,10 @@ test:
185185

186186
test2:
187187
python -m cc_net mine --config config/test_segment.json
188-
python -m cc_net mine --config config/test_segment.json -p fetch_metadata split
188+
python -m cc_net mine --config config/test_reproduce.json
189189
diff \
190190
<(zcat test_data/mined/2019-09/fr_head_0000.json.gz | jq -c 'select(.cc_segment == "crawl-data/CC-MAIN-2019-09/segments/1550247479101.30/wet/CC-MAIN-20190215183319-20190215205319-00000.warc.wet.gz") | {url, perplexity}' | sort) \
191-
<(zcat test_data2/mined/2019-09/CC-MAIN-20190215183319-20190215205319-00000.json.gz | jq -c 'select(.bucket == "head" and .language == "fr") | {url, perplexity}' | sort) \
191+
<(zcat test_data2/mined_by_segment/2019-09/CC-MAIN-20190215183319-20190215205319-00000.json.gz | jq -c 'select(.bucket == "head" and .language == "fr") | {url, perplexity}' | sort) \
192192
| head
193193

194194
diff \

cc_net/__main__.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@
1212

1313

1414
def main():
15-
parser = func_argparse.multi_argparser(
16-
mine=cc_net.mine.get_main_parser("mine"),
17-
reproduce=cc_net.mine.get_main_parser("reproduce"),
18-
)
19-
func_argparse.parse_and_call(parser)
15+
func_argparse.parse_and_call(cc_net.mine.get_main_parser())
2016

2117

2218
if __name__ == "__main__":

cc_net/mine.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"lm",
4242
"pp_bucket",
4343
"drop",
44-
"split",
44+
"split_by_lang",
4545
]
4646

4747

@@ -156,7 +156,7 @@ def _get_dir(self, name: str, regroup: bool = False) -> Path:
156156
return self.output_dir / f"{name}_split" / self.dump
157157
return self.output_dir / name / self.dump
158158

159-
def get_mined_dir(self) -> Path:
159+
def get_mined_dir(self, regroup_dir: bool = False) -> Path:
160160
return self._get_dir(self.mined_dir)
161161

162162

@@ -168,6 +168,13 @@ def get_mined_dir(self) -> Path:
168168
pipeline=list(BASE_CONFIG.pipeline[:-1]) + ["split_by_lang"],
169169
)
170170

171+
REPRODUCE_CONFIG = Config(
172+
config_name="reproduce",
173+
mined_dir="reproduce",
174+
pipeline=["fetch_metadata", "split_by_lang"],
175+
execution="local",
176+
)
177+
171178
TEST_CONFIG = BASE_CONFIG._replace(
172179
config_name="test",
173180
dump="2019-09",
@@ -189,6 +196,7 @@ def get_mined_dir(self) -> Path:
189196
"test": TEST_CONFIG,
190197
"test_slurm": TEST_CONFIG._replace(execution="slurm,partition=dev"),
191198
"debug": TEST_CONFIG._replace(config_name="debug", mine_num_processes=0),
199+
"reproduce": REPRODUCE_CONFIG,
192200
"augment": BASE_CONFIG._replace(
193201
config_name="augment", dump="2019-13", lang_blacklist=["en"]
194202
),
@@ -260,7 +268,7 @@ def _hashes_shard(conf: Config, shard: int, output: Path):
260268

261269
def mine(conf: Config) -> List[Path]:
262270
"""Remove dups, run LID and LMs, and split by lang and quality."""
263-
mined_dir = conf.get_mined_dir() / conf.dump
271+
mined_dir = conf.get_mined_dir()
264272
if conf.will_split:
265273
# Give a directories when splitting
266274
outputs = [mined_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
@@ -380,13 +388,19 @@ def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> s
380388
steps["pp_bucket"] = perplexity.PerplexityBucket(CUTOFF_CSV)
381389
steps["drop"] = perplexity.DropKeys(tok_field)
382390

391+
if "fetch_metadata" in conf.pipeline:
392+
# TODO: better default
393+
assert conf.metadata is not None
394+
steps["fetch_metadata"] = minify.MetadataFetcher(f"{conf.metadata}/{conf.dump}")
395+
396+
steps["minify"] = minify.Minifier()
397+
383398
pattern = str(tmp_output / "{language}_{bucket}.json.gz")
384399
steps["split_by_lang"] = jsonql.split(pattern=str(pattern), mkdir=True)
385400

386401
steps["split_by_segment"] = jsonql.split(
387402
split_fn=lambda doc: _get_segment(tmp_output, doc), mkdir=True
388403
)
389-
steps["minify"] = minify.Minifier()
390404

391405
pipeline = filter(None, (steps[s] for s in conf.pipeline))
392406

@@ -404,8 +418,8 @@ def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> s
404418

405419
def regroup(conf: Config, before: Callable[[Config], List[Path]], dirname: str) -> Path:
406420
"""Reshards each language/quality after 'mine'."""
407-
mined_dir = conf.output_dir / f"{dirname}_split" / conf.dump
408-
regroup_dir = conf.output_dir / dirname / conf.dump
421+
mined_dir = conf.output_dir / f"{conf.mined_dir}_split" / conf.dump
422+
regroup_dir = conf.output_dir / conf.mined_dir / conf.dump
409423

410424
if mined_dir.exists():
411425
all_files = list(mined_dir.glob("????/*.json.gz"))
@@ -656,8 +670,7 @@ def main(entry_point: str, config: str = "base", **config_as_dict: Any) -> None:
656670

657671
print(f"Will run cc_net.mine.{entry_point} with the following config:", conf)
658672
first_stage = {"mine": mine, "reproduce": reproduce}[entry_point]
659-
dir_name = entry_point
660-
regroup_dir = conf._get_dir(dir_name, regroup=True)
673+
dir_name = conf.mined_dir
661674

662675
if "split_by_lang" in conf.pipeline:
663676
# Only try regrouping if we split the shards.
@@ -669,6 +682,7 @@ def main(entry_point: str, config: str = "base", **config_as_dict: Any) -> None:
669682
first_stage(conf)
670683

671684
if conf.config_name == "test":
685+
regroup_dir = conf._get_dir(dir_name, regroup=True)
672686
_validate_test(conf, regroup_dir)
673687

674688

config/test_reproduce.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"hash_in_mem": 2,
3+
"dump": "2019-09",
4+
"num_shards": 4,
5+
"num_segments_per_shard": 1,
6+
"pipeline": [
7+
"fetch_metadata",
8+
"split_by_lang"
9+
],
10+
"metadata": "test_data2/mined_by_segment",
11+
"execution": "debug",
12+
"output_dir": "test_data2",
13+
"mined_dir": "reproduce",
14+
"target_size": "32M",
15+
"cache_dir": "test_data/wet_cache"
16+
}

config/test_segment.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"hash_in_mem": 2,
3+
"dump": "2019-09",
4+
"num_shards": 4,
5+
"num_segments_per_shard": 1,
6+
"mine_num_processes": 0,
7+
"lang_whitelist": ["de", "it", "fr"],
8+
"pipeline": [
9+
"dedup",
10+
"lid",
11+
"keep_lang",
12+
"sp",
13+
"lm",
14+
"pp_bucket",
15+
"minify",
16+
"split_by_segment"
17+
],
18+
"execution": "debug",
19+
"output_dir": "test_data2",
20+
"mined_dir": "mined_by_segment",
21+
"target_size": "32M",
22+
"cache_dir": "test_data/wet_cache"
23+
}

0 commit comments

Comments
 (0)