This commit is contained in:
chenyawei 2025-01-21 13:55:04 +08:00
parent fefc25827e
commit ec369eccb4
5 changed files with 63 additions and 40 deletions

View File

@ -86,9 +86,9 @@ public class GraphCommonService {
Session session = null;
try {
session = sessionPool.borrow(false);
log.info("执行 executeJson 方法gql={}", gql.substring(0, 100));
// log.info("执行 executeJson 方法gql={}", gql.substring(0, 100));
final String data = session.executeJson(gql);
log.info("执行检索语句完毕: {}", msg);
// log.info("执行检索语句完毕: {}", msg);
final JSONObject jsonObject = JSON.parseObject(data);
// 查询语句异常分析, 根据 json 返回结果解析 error 节点信息
final JSONObject error0 = jsonObject.getJSONArray("errors").getJSONObject(0);

View File

@ -43,7 +43,7 @@ public class RelationCompletion {
10, 20, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
private String spaceName;
private Integer limitNUm;
private int limitNUm;
public RelationCompletion(GraphCommonService graphCommonService,
NebulaGraphProperties nebulaGraphProperties) {
@ -69,54 +69,73 @@ public class RelationCompletion {
* 关系反转入口函数
*/
public void relationCompletion() {
log.info("开始进行关系补全....");
// 获取各个节点和边的数量
ShowStatsVo stats = showStats();
Map<String, Integer> tagNumsMap = getTagAndNums(stats);
// 获取节点id列表
Map<String, List<String>> map = new HashMap<>();
// 新起一个nebula线程
// 新起一个nebula session
SessionPool sessionPool = graphCommonService.newPoolConfig();
for (TagEnum tagEnum : TagEnum.values()) {
String tag = tagEnum.getTag();
log.info("开启{}类型的点为起始节点反向关系补全", tag);
List<NebulaMultiMatchJsonResult> idJsonResults =
graphCommonService.executeJson(NebulaUtil.getTagIds(spaceName, tag), NebulaMultiMatchJsonResult.class);
List<String> idLists = NebulaMultiMatchJsonResult.getIdLists(idJsonResults.get(0));
map.put(tag, idLists);
// 遍历边
EdgeEnum[] edgeEnums = EdgeEnum.values();
AtomicLong atomicLong = new AtomicLong(0);
Stream.of(edgeEnums)
.forEach(edgeEnum -> {
log.info("开启{}类型的点, 为起始节点; {}类型的边的反向关系补全, ", tag, edgeEnum.getEdgeType(),
"反向关系为: {}", edgeEnum.getReverseRelation());
CompletableFuture<Void> reverseFuture = CompletableFuture.runAsync(() -> {
String srcTag = edgeEnum.getSrcTag();
String edgeType = edgeEnum.getEdgeType();
String reverseRelation = edgeEnum.getReverseRelation();
for (String id : idLists) {
atomicLong.incrementAndGet();
if (atomicLong.get() / 1000 == 0) {
log.info("进行{}类型的点, 为起始节点; {}类型的边的反向关系补全, ", tag, edgeEnum.getEdgeType(),
"反向关系为: {}", edgeEnum.getReverseRelation(), "进度: {}", atomicLong.get(), "/", idLists.size());
int skip = 0;
while (true) {
List<NebulaMultiMatchJsonResult> idJsonResults =
graphCommonService.executeJson(NebulaUtil.getTagIdsWithLimit(spaceName, tag, skip, limitNUm),
NebulaMultiMatchJsonResult.class);
List<String> idLists = NebulaMultiMatchJsonResult.getIdLists(idJsonResults.get(0));
map.put(tag, idLists);
// 遍历边
EdgeEnum[] edgeEnums = EdgeEnum.values();
AtomicLong atomicLong = new AtomicLong(0);
Stream.of(edgeEnums)
.forEach(edgeEnum -> {
log.info("开启{}类型的点, 为起始节点; {}类型的边的反向关系补全, ", tag, edgeEnum.getEdgeType() +
" 反向关系为: {}", edgeEnum.getReverseRelation());
CompletableFuture<Void> reverseFuture = CompletableFuture.runAsync(() -> {
String srcTag = edgeEnum.getSrcTag();
String edgeType = edgeEnum.getEdgeType();
String reverseRelation = edgeEnum.getReverseRelation();
for (String id : idLists) {
atomicLong.incrementAndGet();
if (atomicLong.get() / 1000 == 0) {
log.info("进行{}类型的点, 为起始节点; {}类型的边的反向关系补全, ", tag, edgeEnum.getEdgeType() +
"反向关系为: {}", edgeEnum.getReverseRelation() + " 进度: {}", atomicLong.get() + "/" + idLists.size());
}
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
NebulaUtil.getDstVerticesById(spaceName, srcTag, id, edgeType),
NebulaVertexJsonResult.class,
"",
sessionPool);
SearchSubgraphVo searchSubgraphVo = NebulaUtil.getSearchSubgraphVo(vertexJsonResults.get(0));
// 构造反向关系并插入nebula
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
}
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
NebulaUtil.getDstVerticesById(spaceName, srcTag, id, edgeType),
NebulaVertexJsonResult.class,
"",
sessionPool);
SearchSubgraphVo searchSubgraphVo = NebulaUtil.getSearchSubgraphVo(vertexJsonResults.get(0));
// 构造反向关系并插入nebula
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
}
}, threadPoolExecutor);
reverseFuture.join();
atomicLong.set(0);
});
}, threadPoolExecutor);
reverseFuture.join();
atomicLong.set(0);
});
skip = skip + limitNUm;
if (skip >= tagNumsMap.get(tag)) {
break;
}
}
}
sessionPool.release();
threadPoolExecutor.shutdown();
}
private Map<String, Integer> getTagAndNums(ShowStatsVo stats) {
Map<String, Integer> map = new HashMap<>();
for (ShowStatsVo.Data statsNum : stats.getNums()) {
map.put(statsNum.getType(), Integer.valueOf(statsNum.getCount()));
}
return map;
}
/**
* 构造反向关系
* @param vo

View File

@ -26,5 +26,5 @@ public class NebulaGraphProperties {
private int timeout;
private int idleTime;
private String spaceName;
private Integer limitNum;
private int limitNum;
}

View File

@ -24,6 +24,10 @@ public class NebulaUtil {
return String.format("use %s; match (v:%s) return id(v);", spaceName, tag);
}
public static String getTagIdsWithLimit(String spaceName, String tag, int skip, int limit) {
return String.format("use %s; match (v:%s) return id(v) skip %s limit %s;", spaceName, tag, skip, limit);
}
public static String showStats(String spaceName) {
return String.format("use %s; show stats;", spaceName);
}

View File

@ -1,10 +1,10 @@
nebula:
userName: root
password: nebula
hostAddresses: 172.16.20.2:9669,172.16.20.24:9669,172.16.20.5:9669
hostAddresses: 172.16.20.2:9669,172.16.20.4:9669,172.16.20.5:9669
minConnsSize: 20
maxConnSize: 100
timeout: 600000
idleTime: 180000
spaceName: Y24_1206
limitNum: 1000000
limitNum: 10000 #