Compare commits

...

2 Commits

2 changed files with 95 additions and 89 deletions

View File

@ -37,7 +37,6 @@ public class GraphCommonService {
Session session; Session session;
public <T> List<T> executeJson(String gql, Class<T> voClass) { public <T> List<T> executeJson(String gql, Class<T> voClass) {
try { try {
log.info("执行 executeJson 方法gql={}", gql); log.info("执行 executeJson 方法gql={}", gql);
final String data = session.executeJson(gql); final String data = session.executeJson(gql);
@ -60,7 +59,6 @@ public class GraphCommonService {
log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage()); log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage());
throw e; throw e;
} }
} }
@ -81,7 +79,6 @@ public class GraphCommonService {
Session session = null; Session session = null;
try { try {
session = sessionPool.borrow(false); session = sessionPool.borrow(false);
//
final String data = session.executeJson(gql); final String data = session.executeJson(gql);
if (isLog) { if (isLog) {
log.info("执行 executeJson 方法gql={}", gql.substring(0, 100)); log.info("执行 executeJson 方法gql={}", gql.substring(0, 100));
@ -99,7 +96,11 @@ public class GraphCommonService {
JSONArray results = JSONUtil.parseArray(jsonObject.get("results")); JSONArray results = JSONUtil.parseArray(jsonObject.get("results"));
return JSONUtil.toList(results, voClass); return JSONUtil.toList(results, voClass);
} catch (IOErrorException e) { } catch (IOErrorException e) {
if (isLog) {
log.error("executeJson ql[{}] error, msg [{}]", gql.substring(0, 100), e.getMessage());
} else {
log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage()); log.error("executeJson ql[{}] error, msg [{}]", gql, e.getMessage());
}
throw new GraphExecuteException("execute gql error, gql: " + gql, e); throw new GraphExecuteException("execute gql error, gql: " + gql, e);
} finally { } finally {
session.release(); session.release();

View File

@ -42,6 +42,7 @@ public class RelationCompletion {
private final String spaceName; private final String spaceName;
private final int limitNum; private final int limitNum;
private static final String ALL_NUM = "allGetNum"; private static final String ALL_NUM = "allGetNum";
private static final Logger log = LoggerFactory.getLogger(RelationCompletion.class);
public RelationCompletion(GraphCommonService graphCommonService, public RelationCompletion(GraphCommonService graphCommonService,
NebulaGraphProperties nebulaGraphProperties) { NebulaGraphProperties nebulaGraphProperties) {
@ -52,8 +53,6 @@ public class RelationCompletion {
10, 10, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 10, 10, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
} }
private static Logger log = LoggerFactory.getLogger(RelationCompletion.class);
/** /**
* 关系反转入口函数 * 关系反转入口函数
*/ */
@ -65,21 +64,23 @@ public class RelationCompletion {
Map<String, Integer> tagNumsMap = getTagAndNums(stats); Map<String, Integer> tagNumsMap = getTagAndNums(stats);
// 新起一个nebula session池 // 新起一个nebula session池
SessionPool sessionPool = graphCommonService.newPoolConfig(); SessionPool sessionPool = graphCommonService.newPoolConfig();
boolean isComplete = false;
while (true) {
if (isComplete) {
log.info("关系补全全部结束...");
break;
}
for (TagEnum tagEnum : TagEnum.values()) { for (TagEnum tagEnum : TagEnum.values()) {
Object allGetNum = FileCacheManager.get(ALL_NUM, Integer.class); Object allGetNum = FileCacheManager.get(ALL_NUM, Integer.class);
if (allGetNum != null && (Integer) allGetNum >= 5) { if (allGetNum != null && (Integer) allGetNum >= 5) {
isComplete = true;
break; break;
} }
String tag = tagEnum.getTag(); String tag = tagEnum.getTag();
Integer num = FileCacheManager.get(tag + "_num", Integer.class); Integer num = FileCacheManager.get(tag + "_num", Integer.class);
// Object num = redisClient.get(tag + "_num"); // Object num = redisClient.get(tag + "_num");
Integer skip = (num == null ? 0 : num); Integer skip = (num == null ? 0 : num);
if (tagNumsMap.get(tag) < skip) { if (tagNumsMap.get(tag) < skip) {
FileCacheManager.increment(ALL_NUM); FileCacheManager.increment(ALL_NUM);
// redisClient.increment(ALL_NUM); // redisClient.increment(ALL_NUM);
@ -109,15 +110,11 @@ public class RelationCompletion {
Integer finalSkip = skip; Integer finalSkip = skip;
CompletableFuture<Void> setVertexAndEdgeTotalsMap = CompletableFuture.supplyAsync(() -> { CompletableFuture<Void> setVertexAndEdgeTotalsMap = CompletableFuture.supplyAsync(() -> {
try { try {
String srcTag = edgeEnum.getSrcTag(); String srcTag = edgeEnum.getSrcTag();
String edgeType = edgeEnum.getEdgeType(); String edgeType = edgeEnum.getEdgeType();
String reverseRelation = edgeEnum.getReverseRelation(); String reverseRelation = edgeEnum.getReverseRelation();
Instant now = Instant.now(); Instant now = Instant.now();
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs( List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
NebulaUtil.getDstVertices(spaceName, srcTag, idLists, edgeType), NebulaUtil.getDstVertices(spaceName, srcTag, idLists, edgeType),
NebulaVertexJsonResult.class, NebulaVertexJsonResult.class,
@ -129,12 +126,9 @@ public class RelationCompletion {
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation); convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
log.info("{}类型的点; {}类型的边的反向关系补全, 反向关系为: {} 耗时为:{}", tag, edgeEnum.getEdgeType() log.info("{}类型的点; {}类型的边的反向关系补全, 反向关系为: {} 耗时为:{}", tag, edgeEnum.getEdgeType()
, edgeEnum.getReverseRelation(), Duration.between(now, Instant.now())); , edgeEnum.getReverseRelation(), Duration.between(now, Instant.now()));
} catch (Exception e) { } catch (Exception e) {
log.info("反向关系补全 edgeEnum:{} 失败", edgeEnum, e); log.info("反向关系补全 edgeEnum:{} 失败", edgeEnum, e);
} }
log.info("{}类型的点为起始节点{}到第{}条数据反向关系补全完成", tag, finalSkip, finalSkip + this.limitNum); log.info("{}类型的点为起始节点{}到第{}条数据反向关系补全完成", tag, finalSkip, finalSkip + this.limitNum);
return null; return null;
}, threadPoolExecutor); }, threadPoolExecutor);
@ -147,7 +141,7 @@ public class RelationCompletion {
FileCacheManager.set(tag + "_num", skip); FileCacheManager.set(tag + "_num", skip);
} }
} }
}
private Map<String, Integer> getTagAndNums(ShowStatsVo stats) { private Map<String, Integer> getTagAndNums(ShowStatsVo stats) {
Map<String, Integer> map = new HashMap<>(); Map<String, Integer> map = new HashMap<>();
@ -170,8 +164,14 @@ public class RelationCompletion {
log.info("reverseRelation {} 仅有节点数据,无需关系补全", reverseRelation); log.info("reverseRelation {} 仅有节点数据,无需关系补全", reverseRelation);
return; return;
} }
// 插入边
insertEdge(links, sessionPool, reverseRelation);
log.info("reverseRelation {} 存在边关系,已关系补全", reverseRelation);
}
private void insertEdge(List< SearchSubgraphVo.EdgeVo> links, SessionPool sessionPool, String reverseRelation) {
// TODO 处理请求参数超出服务端设置的大小时的情况服务端默认4MB超出会报异常 // TODO 处理请求参数超出服务端设置的大小时的情况服务端默认4MB超出会报异常
try {
graphCommonService.executeJsonWithoutLogs( graphCommonService.executeJsonWithoutLogs(
NebulaUtil.insertEdges(spaceName, links, reverseRelation), NebulaUtil.insertEdges(spaceName, links, reverseRelation),
CommonVo.class, CommonVo.class,
@ -179,7 +179,12 @@ public class RelationCompletion {
sessionPool, sessionPool,
true true
); );
log.info("reverseRelation {} 存在边关系,已关系补全", reverseRelation); } catch (Exception e) {
log.warn("请求参数大小超出服务端限制,减少参数重新发送...");
int mid = links.size() / 2;
insertEdge(links.subList(0, mid), sessionPool, reverseRelation);
insertEdge(links.subList(mid, links.size()), sessionPool, reverseRelation);
}
} }
public ShowStatsVo showStats() { public ShowStatsVo showStats() {