Commit 8a936a2
authored
refactor(loader): support concurrent readers, short-id & Graphsrc (#683)
* update pom and readme for version-1.7.0
* added GraphSource & GraphReader;
added method InputReader.multiReaders() and adapted for all SOURCE
* 增加AFSSource,graphSource,部分升级HDFSSource;
多文件输入这部分还没确认完成,初步进展
相应配置 & 细节更改:
1. FileSource 新增了 dir_filter 和 extra_date_formats 参数,并修改了构造函数;并增加了 ORC/Parquet 文件表头不区分大小写的支持FileSource.headerCaseSensitive以及单文件应用的splitCount,提升了文件加载的灵活性和兼容性。
2. InputSource加入headerCaseSensitive()默认区分大小写
多文件输入功能
FileReader.java
init() 只负责调用 progress(context, struct),不再扫描文件。
文件扫描和 reader 分裂逻辑移到了 split() 方法:
调用 scanReadables() 获取所有文件
排序
创建多个 FileReader 子实例,每个对应一个文件
InputProgress.java
新版特点
- 进度管理基于 文件名 -> InputItemProgress 的 Map
- 可以同时跟踪多个文件的加载状态(已加载 / 正在加载)
- 支持 多线程并发 和更精细的控制(比如只确认某个文件的 offset,或者只标记某个文件 loaded)
相关接口重构
旧版
- loadingItem():返回单个 loadingItem
- addLoadingItem(InputItemProgress):替换当前 loadingItem,旧的丢到 loadingItems
- loadingOffset():返回当前 loadingItem.offset()
- markLoaded(boolean markAll):
新版
- loadingItem(String name):按文件名查找对应的 loadingItem
- addLoadingItem(String name, InputItemProgress):按文件名新增
- 取消了 loadingOffset(),因为已经支持多文件了,offset 必须按文件取
- markLoaded(Readable readable, boolean markAll):
- 如果传入 readable → 把对应文件从 loadingItems 移到 loadedItems
- 否则(readable=null 且 markAll=true)→ 把全部 loadingItems 移过去
InputProgressDeser.java
旧版
Set<InputItemProgress> loadedItems;
InputItemProgress loadingItem;
用 Set 存储已完成的 items,用单对象存储正在加载的 item。
新版
Map<String, InputItemProgress> loadedItems;
Map<String, InputItemProgress> loadingItems;
改成 Map(key 是字符串,比如文件名/ID),既能保持唯一性又能快速索引,还支持多个并发 "loading items"。
并且使用了:
Collections.synchronizedMap(InsertionOrderUtil.newMap());
来保证线程安全 + 保留插入顺1 parent 852d76a commit 8a936a2
File tree
69 files changed
+4058
-679
lines changed- hugegraph-dist/scripts/dependency
- hugegraph-loader
- src
- main/java/org/apache/hugegraph/loader
- builder
- constant
- direct/loader
- executor
- failure
- filter
- util
- flink
- mapping
- progress
- reader
- file
- graph
- hdfs
- jdbc
- kafka
- serializer
- source
- file
- graph
- jdbc
- spark
- task
- util
- test
- java/org/apache/hugegraph/loader/test
- functional
- unit
- resources
- hdfs_file_with_prefix
- hdfs_with_core_site_path
- hdfs_with_empty_core_site_path
- hdfs_with_unexist_core_site_path
Some content is hidden
Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
69 files changed
+4058
-679
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
85 | 85 | | |
86 | 86 | | |
87 | 87 | | |
| 88 | + | |
88 | 89 | | |
89 | 90 | | |
90 | 91 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
283 | 283 | | |
284 | 284 | | |
285 | 285 | | |
| 286 | + | |
286 | 287 | | |
287 | 288 | | |
288 | 289 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
52 | 52 | | |
53 | 53 | | |
54 | 54 | | |
| 55 | + | |
55 | 56 | | |
56 | 57 | | |
57 | 58 | | |
| |||
542 | 543 | | |
543 | 544 | | |
544 | 545 | | |
| 546 | + | |
| 547 | + | |
| 548 | + | |
| 549 | + | |
| 550 | + | |
545 | 551 | | |
546 | 552 | | |
547 | 553 | | |
| |||
0 commit comments