处理请求参数大小超出服务端限制的情况

This commit is contained in:
chenyawei 2025-02-10 11:10:37 +08:00
parent d7204933a7
commit 6c17cf12a7

View File

@ -42,6 +42,7 @@ public class RelationCompletion {
private final String spaceName;
private final int limitNum;
private static final String ALL_NUM = "allGetNum";
private static final Logger log = LoggerFactory.getLogger(RelationCompletion.class);
public RelationCompletion(GraphCommonService graphCommonService,
NebulaGraphProperties nebulaGraphProperties) {
@ -52,8 +53,6 @@ public class RelationCompletion {
10, 10, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
private static Logger log = LoggerFactory.getLogger(RelationCompletion.class);
/**
* 关系反转入口函数
*/
@ -65,90 +64,85 @@ public class RelationCompletion {
Map<String, Integer> tagNumsMap = getTagAndNums(stats);
// 新起一个nebula session池
SessionPool sessionPool = graphCommonService.newPoolConfig();
for (TagEnum tagEnum : TagEnum.values()) {
Object allGetNum = FileCacheManager.get(ALL_NUM, Integer.class);
if (allGetNum != null && (Integer) allGetNum >= 5) {
boolean isComplete = false;
while (true) {
if (isComplete) {
log.info("关系补全全部结束...");
break;
}
for (TagEnum tagEnum : TagEnum.values()) {
Object allGetNum = FileCacheManager.get(ALL_NUM, Integer.class);
if (allGetNum != null && (Integer) allGetNum >= 5) {
isComplete = true;
break;
}
String tag = tagEnum.getTag();
String tag = tagEnum.getTag();
Integer num = FileCacheManager.get(tag + "_num", Integer.class);
// Object num = redisClient.get(tag + "_num");
Integer skip = (num == null ? 0 : num);
if (tagNumsMap.get(tag) < skip) {
FileCacheManager.increment(ALL_NUM);
// redisClient.increment(ALL_NUM);
continue;
}
log.info("开启{}类型的点为起始节点的第{}条数据到第{}条数据反向关系补全", tag, skip, skip + this.limitNum);
Integer num = FileCacheManager.get(tag + "_num", Integer.class);
// Object num = redisClient.get(tag + "_num");
List<NebulaMultiMatchJsonResult> idJsonResults =
graphCommonService.executeJson(NebulaUtil.getTagIdsWithLimit(spaceName, tag, skip, this.limitNum),
NebulaMultiMatchJsonResult.class);
Integer skip = (num == null ? 0 : num);
if (CollectionUtils.isEmpty(idJsonResults)) {
break;
}
if (tagNumsMap.get(tag) < skip) {
FileCacheManager.increment(ALL_NUM);
// redisClient.increment(ALL_NUM);
continue;
List<String> idLists = NebulaMultiMatchJsonResult.getIdLists(idJsonResults.get(0));
if (CollectionUtils.isEmpty(idLists)) {
log.info("idLists为空...");
break;
} else {
log.info("查询到的idLists大小: {}", idLists.size());
}
List<CompletableFuture<?>> futures = new ArrayList<>();
// 遍历边
EdgeEnum[] edgeEnums = EdgeEnum.values();
for (EdgeEnum edgeEnum : edgeEnums) {
Integer finalSkip = skip;
CompletableFuture<Void> setVertexAndEdgeTotalsMap = CompletableFuture.supplyAsync(() -> {
try {
String srcTag = edgeEnum.getSrcTag();
String edgeType = edgeEnum.getEdgeType();
String reverseRelation = edgeEnum.getReverseRelation();
Instant now = Instant.now();
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
NebulaUtil.getDstVertices(spaceName, srcTag, idLists, edgeType),
NebulaVertexJsonResult.class,
"",
sessionPool);
SearchSubgraphVo searchSubgraphVo = NebulaUtil.getSearchSubgraphVo(vertexJsonResults.get(0));
// 构造反向关系并插入nebula
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
log.info("{}类型的点; {}类型的边的反向关系补全, 反向关系为: {} 耗时为:{}", tag, edgeEnum.getEdgeType()
, edgeEnum.getReverseRelation(), Duration.between(now, Instant.now()));
} catch (Exception e) {
log.info("反向关系补全 edgeEnum:{} 失败", edgeEnum, e);
}
log.info("{}类型的点为起始节点{}到第{}条数据反向关系补全完成", tag, finalSkip, finalSkip + this.limitNum);
return null;
}, threadPoolExecutor);
futures.add(setVertexAndEdgeTotalsMap);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 更新特定标签的数量
skip += this.limitNum;
FileCacheManager.set(tag + "_num", skip);
}
log.info("开启{}类型的点为起始节点的第{}条数据到第{}条数据反向关系补全", tag, skip, skip + this.limitNum);
List<NebulaMultiMatchJsonResult> idJsonResults =
graphCommonService.executeJson(NebulaUtil.getTagIdsWithLimit(spaceName, tag, skip, this.limitNum),
NebulaMultiMatchJsonResult.class);
if (CollectionUtils.isEmpty(idJsonResults)) {
break;
}
List<String> idLists = NebulaMultiMatchJsonResult.getIdLists(idJsonResults.get(0));
if (CollectionUtils.isEmpty(idLists)) {
log.info("idLists为空...");
break;
} else {
log.info("查询到的idLists大小: {}", idLists.size());
}
List<CompletableFuture<?>> futures = new ArrayList<>();
// 遍历边
EdgeEnum[] edgeEnums = EdgeEnum.values();
for (EdgeEnum edgeEnum : edgeEnums) {
Integer finalSkip = skip;
CompletableFuture<Void> setVertexAndEdgeTotalsMap = CompletableFuture.supplyAsync(() -> {
try {
String srcTag = edgeEnum.getSrcTag();
String edgeType = edgeEnum.getEdgeType();
String reverseRelation = edgeEnum.getReverseRelation();
Instant now = Instant.now();
List<NebulaVertexJsonResult> vertexJsonResults = graphCommonService.executeJsonWithoutLogs(
NebulaUtil.getDstVertices(spaceName, srcTag, idLists, edgeType),
NebulaVertexJsonResult.class,
"",
sessionPool);
SearchSubgraphVo searchSubgraphVo = NebulaUtil.getSearchSubgraphVo(vertexJsonResults.get(0));
// 构造反向关系并插入nebula
convertEdgeDirection(searchSubgraphVo, sessionPool, reverseRelation);
log.info("{}类型的点; {}类型的边的反向关系补全, 反向关系为: {} 耗时为:{}", tag, edgeEnum.getEdgeType()
, edgeEnum.getReverseRelation(), Duration.between(now, Instant.now()));
} catch (Exception e) {
log.info("反向关系补全 edgeEnum:{} 失败", edgeEnum, e);
}
log.info("{}类型的点为起始节点{}到第{}条数据反向关系补全完成", tag, finalSkip, finalSkip + this.limitNum);
return null;
}, threadPoolExecutor);
futures.add(setVertexAndEdgeTotalsMap);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 更新特定标签的数量
skip += this.limitNum;
FileCacheManager.set(tag + "_num", skip);
}
}
private Map<String, Integer> getTagAndNums(ShowStatsVo stats) {
Map<String, Integer> map = new HashMap<>();
for (ShowStatsVo.Data statsNum : stats.getNums()) {
@ -170,18 +164,29 @@ public class RelationCompletion {
log.info("reverseRelation {} 仅有节点数据,无需关系补全", reverseRelation);
return;
}
// TODO 处理请求参数超出服务端设置的大小时的情况服务端默认4MB超出会报异常
graphCommonService.executeJsonWithoutLogs(
NebulaUtil.insertEdges(spaceName, links, reverseRelation),
CommonVo.class,
"",
sessionPool,
true
);
// 插入边
insertEdge(links, sessionPool, reverseRelation);
log.info("reverseRelation {} 存在边关系,已关系补全", reverseRelation);
}
private void insertEdge(List< SearchSubgraphVo.EdgeVo> links, SessionPool sessionPool, String reverseRelation) {
// TODO 处理请求参数超出服务端设置的大小时的情况服务端默认4MB超出会报异常
try {
graphCommonService.executeJsonWithoutLogs(
NebulaUtil.insertEdges(spaceName, links, reverseRelation),
CommonVo.class,
"",
sessionPool,
true
);
} catch (Exception e) {
log.warn("请求参数大小超出服务端限制,减少参数重新发送...");
int mid = links.size() / 2;
insertEdge(links.subList(0, mid), sessionPool, reverseRelation);
insertEdge(links.subList(mid, links.size()), sessionPool, reverseRelation);
}
}
public ShowStatsVo showStats() {
// 先创建job执行再show stats
List<ShowStats> res = graphCommonService.executeJson(NebulaUtil.showStats(spaceName), ShowStats.class);