Compare commits
2 Commits
564b35a68d
...
6c17cf12a7
| Author | SHA1 | Date | |
|---|---|---|---|
| 6c17cf12a7 | |||
| d7204933a7 |
|
|
@ -37,7 +37,6 @@ public class GraphCommonService {
|
||||||
Session session;
|
Session session;
|
||||||
|
|
||||||
public <T> List<T> executeJson(String gql, Class<T> voClass) {
|
public <T> List<T> executeJson(String gql, Class<T> voClass) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
log.info("执行 executeJson 方法,gql={}", gql);
|
log.info("执行 executeJson 方法,gql={}", gql);
|
||||||
final String data = session.executeJson(gql);
|
final String data = session.executeJson(gql);
|
||||||
|
|
@ -60,7 +59,6 @@ public class GraphCommonService {
|
||||||
log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage());
|
log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage());
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -81,7 +79,6 @@ public class GraphCommonService {
|
||||||
Session session = null;
|
Session session = null;
|
||||||
try {
|
try {
|
||||||
session = sessionPool.borrow(false);
|
session = sessionPool.borrow(false);
|
||||||
//
|
|
||||||
final String data = session.executeJson(gql);
|
final String data = session.executeJson(gql);
|
||||||
if (isLog) {
|
if (isLog) {
|
||||||
log.info("执行 executeJson 方法,gql={}", gql.substring(0, 100));
|
log.info("执行 executeJson 方法,gql={}", gql.substring(0, 100));
|
||||||
|
|
@ -99,7 +96,11 @@ public class GraphCommonService {
|
||||||
JSONArray results = JSONUtil.parseArray(jsonObject.get("results"));
|
JSONArray results = JSONUtil.parseArray(jsonObject.get("results"));
|
||||||
return JSONUtil.toList(results, voClass);
|
return JSONUtil.toList(results, voClass);
|
||||||
} catch (IOErrorException e) {
|
} catch (IOErrorException e) {
|
||||||
|
if (isLog) {
|
||||||
|
log.error("executeJson ql[{}] error, msg [{}]", gql.substring(0, 100), e.getMessage());
|
||||||
|
} else {
|
||||||
log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage());
|
log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage());
|
||||||
|
}
|
||||||
throw new GraphExecuteException("execute gql error, gql: " + gql, e);
|
throw new GraphExecuteException("execute gql error, gql: " + gql, e);
|
||||||
} finally {
|
} finally {
|
||||||
session.release();
|
session.release();
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ public class RelationCompletion {
|
||||||
private final String spaceName;
|
private final String spaceName;
|
||||||
private final int limitNum;
|
private final int limitNum;
|
||||||
private static final String ALL_NUM = "allGetNum";
|
private static final String ALL_NUM = "allGetNum";
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(RelationCompletion.class);
|
||||||
|
|
||||||
public RelationCompletion(GraphCommonService graphCommonService,
|
public RelationCompletion(GraphCommonService graphCommonService,
|
||||||
NebulaGraphProperties nebulaGraphProperties) {
|
NebulaGraphProperties nebulaGraphProperties) {
|
||||||
|
|
@ -52,8 +53,6 @@ public class RelationCompletion {
|
||||||
10, 10, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
|
10, 10, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Logger log = LoggerFactory.getLogger(RelationCompletion.class);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 关系反转入口函数
|
* 关系反转入口函数
|
||||||
*/
|
*/
|
||||||
|
|
@ -65,21 +64,23 @@ public class RelationCompletion {
|
||||||
Map<String, Integer> tagNumsMap = getTagAndNums(stats);
|
Map<String, Integer> tagNumsMap = getTagAndNums(stats);
|
||||||
// 新起一个nebula session池
|
// 新起一个nebula session池
|
||||||
SessionPool sessionPool = graphCommonService.newPoolConfig();
|
SessionPool sessionPool = graphCommonService.newPoolConfig();
|
||||||
|
boolean isComplete = false;
|
||||||
|
while (true) {
|
||||||
|
if (isComplete) {
|
||||||
|
log.info("关系补全全部结束...");
|
||||||
|
break;
|
||||||
|
}
|
||||||
for (TagEnum tagEnum : TagEnum.values()) {
|
for (TagEnum tagEnum : TagEnum.values()) {
|
||||||
|
|
||||||
Object allGetNum = FileCacheManager.get(ALL_NUM, Integer.class);
|
Object allGetNum = FileCacheManager.get(ALL_NUM, Integer.class);
|
||||||
if (allGetNum != null && (Integer) allGetNum >= 5) {
|
if (allGetNum != null && (Integer) allGetNum >= 5) {
|
||||||
|
isComplete = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
String tag = tagEnum.getTag();
|
String tag = tagEnum.getTag();
|
||||||
|
|
||||||
Integer num = FileCacheManager.get(tag + "_num", Integer.class);
|
Integer num = FileCacheManager.get(tag + "_num", Integer.class);
|
||||||
// Object num = redisClient.get(tag + "_num");
|
// Object num = redisClient.get(tag + "_num");
|
||||||
|
|
||||||
Integer skip = (num == null ? 0 : num);
|
Integer skip = (num == null ? 0 : num);
|
||||||
|
|
||||||
if (tagNumsMap.get(tag) < skip) {
|
if (tagNumsMap.get(tag) < skip) {
|
||||||
FileCacheManager.increment(ALL_NUM);
|
FileCacheManager.increment(ALL_NUM);
|
||||||
// redisClient.increment(ALL_NUM);
|
// redisClient.increment(ALL_NUM);
|
||||||
|
|
@ -109,15 +110,11 @@ public class RelationCompletion {
|
||||||
Integer finalSkip = skip;
|
Integer finalSkip = skip;
|
||||||
CompletableFuture<Void> setVertexAndEdgeTotalsMap = CompletableFuture.supplyAsync(() -> {
|
CompletableFuture<Void> setVertexAndEdgeTotalsMap = CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
|
||||||
String srcTag = edgeEnum.getSrcTag();
|
String srcTag = edgeEnum.getSrcTag();
|
||||||
String edgeType = edgeEnum.getEdgeType();
|
String edgeType = edgeEnum.getEdgeType();
|
||||||
String reverseRelation = edgeEnum.getReverseRelation();
|
String reverseRelation = edgeEnum.getReverseRelation();
|
||||||
|
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
|
|
||||||
|
|
||||||
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
|
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
|
||||||
NebulaUtil.getDstVertices(spaceName, srcTag, idLists, edgeType),
|
NebulaUtil.getDstVertices(spaceName, srcTag, idLists, edgeType),
|
||||||
NebulaVertexJsonResult.class,
|
NebulaVertexJsonResult.class,
|
||||||
|
|
@ -129,12 +126,9 @@ public class RelationCompletion {
|
||||||
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
|
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
|
||||||
log.info("{}类型的点; {}类型的边的反向关系补全, 反向关系为: {} 耗时为:{}", tag, edgeEnum.getEdgeType()
|
log.info("{}类型的点; {}类型的边的反向关系补全, 反向关系为: {} 耗时为:{}", tag, edgeEnum.getEdgeType()
|
||||||
, edgeEnum.getReverseRelation(), Duration.between(now, Instant.now()));
|
, edgeEnum.getReverseRelation(), Duration.between(now, Instant.now()));
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
||||||
log.info("反向关系补全 edgeEnum:{} 失败", edgeEnum, e);
|
log.info("反向关系补全 edgeEnum:{} 失败", edgeEnum, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("{}类型的点为起始节点{}到第{}条数据反向关系补全完成", tag, finalSkip, finalSkip + this.limitNum);
|
log.info("{}类型的点为起始节点{}到第{}条数据反向关系补全完成", tag, finalSkip, finalSkip + this.limitNum);
|
||||||
return null;
|
return null;
|
||||||
}, threadPoolExecutor);
|
}, threadPoolExecutor);
|
||||||
|
|
@ -147,7 +141,7 @@ public class RelationCompletion {
|
||||||
FileCacheManager.set(tag + "_num", skip);
|
FileCacheManager.set(tag + "_num", skip);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Integer> getTagAndNums(ShowStatsVo stats) {
|
private Map<String, Integer> getTagAndNums(ShowStatsVo stats) {
|
||||||
Map<String, Integer> map = new HashMap<>();
|
Map<String, Integer> map = new HashMap<>();
|
||||||
|
|
@ -170,8 +164,14 @@ public class RelationCompletion {
|
||||||
log.info("reverseRelation {} 仅有节点数据,无需关系补全", reverseRelation);
|
log.info("reverseRelation {} 仅有节点数据,无需关系补全", reverseRelation);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// 插入边
|
||||||
|
insertEdge(links, sessionPool, reverseRelation);
|
||||||
|
log.info("reverseRelation {} 存在边关系,已关系补全", reverseRelation);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insertEdge(List< SearchSubgraphVo.EdgeVo> links, SessionPool sessionPool, String reverseRelation) {
|
||||||
// TODO 处理请求参数超出服务端设置的大小时的情况,服务端默认4MB,超出会报异常
|
// TODO 处理请求参数超出服务端设置的大小时的情况,服务端默认4MB,超出会报异常
|
||||||
|
try {
|
||||||
graphCommonService.executeJsonWithoutLogs(
|
graphCommonService.executeJsonWithoutLogs(
|
||||||
NebulaUtil.insertEdges(spaceName, links, reverseRelation),
|
NebulaUtil.insertEdges(spaceName, links, reverseRelation),
|
||||||
CommonVo.class,
|
CommonVo.class,
|
||||||
|
|
@ -179,7 +179,12 @@ public class RelationCompletion {
|
||||||
sessionPool,
|
sessionPool,
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
log.info("reverseRelation {} 存在边关系,已关系补全", reverseRelation);
|
} catch (Exception e) {
|
||||||
|
log.warn("请求参数大小超出服务端限制,减少参数重新发送...");
|
||||||
|
int mid = links.size() / 2;
|
||||||
|
insertEdge(links.subList(0, mid), sessionPool, reverseRelation);
|
||||||
|
insertEdge(links.subList(mid, links.size()), sessionPool, reverseRelation);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShowStatsVo showStats() {
|
public ShowStatsVo showStats() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user