|
2 | 2 | from __future__ import annotations |
3 | 3 |
|
4 | 4 | import os.path |
| 5 | +import pickle |
5 | 6 | from pathlib import Path |
6 | 7 |
|
7 | 8 | import gffutils |
@@ -278,6 +279,41 @@ def fetch_bias_bw(self, verify=False): |
278 | 279 |
|
279 | 280 | return bias_bw |
280 | 281 |
|
| 282 | + def fetch_cpg_index(self): |
| 283 | + fa_path = str(self.fetch_fa()) |
| 284 | + cpg_index = fa_path + "_cpg_index.pkl" |
| 285 | + if not os.path.exists(cpg_index): |
| 286 | + print("Building CpG index...") |
| 287 | + cpg_index_all, cpg_num, cpg_coord = build_CpG_index(self) |
| 288 | + with open(cpg_index, "wb") as f: |
| 289 | + pickle.dump((cpg_index_all, cpg_num, cpg_coord), f) |
| 290 | + else: |
| 291 | + with open(cpg_index, "rb") as f: |
| 292 | + cpg_index_all, cpg_num, cpg_coord = pickle.load(f) |
| 293 | + return cpg_index_all, cpg_num, cpg_coord |
| 294 | + |
| 295 | + |
| 296 | +def build_CpG_index(genome): |
| 297 | + fa_path = Path(genome.fetch_fa()) |
| 298 | + fasta = Fasta(fa_path, as_raw=True) |
| 299 | + cpg_index_all = {} |
| 300 | + cpg_num = {} |
| 301 | + chroms_all = set(genome.chrom_sizes.keys()) |
| 302 | + cpg_coord = {} |
| 303 | + for chrom in chroms_all: |
| 304 | + cpg_index_all[chrom] = {} |
| 305 | + cpg_coord[chrom] = [] |
| 306 | + seq = fasta[chrom][:].upper() |
| 307 | + count = 0 |
| 308 | + for i in trange(len(seq) - 1): |
| 309 | + if seq[i : i + 2] == "CG": |
| 310 | + cpg_index_all[chrom][i + 1] = count |
| 311 | + cpg_coord[chrom].append(i + 1) |
| 312 | + count += 1 |
| 313 | + cpg_num[chrom] = count |
| 314 | + fasta.close() |
| 315 | + return cpg_index_all, cpg_num, cpg_coord |
| 316 | + |
281 | 317 |
|
282 | 318 | def predict_genome_tn5_bias( |
283 | 319 | fa_file, |
|
0 commit comments