Compare commits

..

No commits in common. "d4457be1c6346ae767a16cd87b878eaeb890aa41" and "709e1e31f938d86dbb96aa8eb7c3df37b3d6fe83" have entirely different histories.

2 changed files with 48 additions and 79 deletions

5
.gitignore vendored
View File

@ -5,11 +5,9 @@ target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/misc.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
.idea/inspectionProfiles/Project_Default.xml
*.iws
*.iml
*.ipr
@ -37,5 +35,4 @@ build/
.vscode/
### Mac OS ###
.DS_Store
/.idea/
.DS_Store

View File

@ -35,16 +35,19 @@ public class GeneratorTestData {
);
private static final Logger log = LoggerFactory.getLogger(GeneratorTestData.class);
private static String generateId() {
return faker.bothify("????????????????????????????????????????");
}
public static final int printNum = 1000;
public static final int circleNum = 10;
public static final int smallNum = 1;
public static final int circleNum = 3;
private static Map<String, List<String>> ids = new HashMap<>();
public static String outputDir = "";
public static void main(String[] args) throws IOException {
Map<String, List<String>> ids = new HashMap<>();
if (args != null && args.length >= 1) {
for (String arg : args) {
outputDir = arg;
@ -56,72 +59,31 @@ public class GeneratorTestData {
}
// 创建目录
Files.createDirectories(Paths.get(outputDir));
// 创建tag线程池
ThreadPoolExecutor tagExecutor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(), // workQueue 不指定初始大小默认无界
Executors.defaultThreadFactory() // threadFactory
);
// 创建edge线程池
ThreadPoolExecutor edgeExecutor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(), // workQueue 不指定初始大小默认无界
Executors.defaultThreadFactory() // threadFactory
);
for (int i = 0; i < circleNum; i++) {
Map<String, List<String>> ids = new HashMap<>();
generateData(tagExecutor, ids, i);
generateEdge(edgeExecutor, ids, i);
generateData(ids, i);
generateEdge(i);
ids = new HashMap<>();
}
// 关闭线程不再接受新的任务
showdownExecutor(tagExecutor);
showdownExecutor(edgeExecutor);
log.info("所有数据生成执行结束....");
}
/**
* 关闭线程池
* @param executor
*/
private static void showdownExecutor(ExecutorService executor) {
executor.shutdown();
// 确保线程池在程序结束前完全关闭
if (!executor.isTerminated()) {
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
log.error("Some tasks did not complete within the specified time.");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
private static void generateData(ExecutorService tagExecutor, Map<String, List<String>> ids, int count) {
private static void generateData(Map<String, List<String>> ids, int count) {
ExecutorService tagExecutor = Executors.newFixedThreadPool(5);
// 使用一个列表来保存所有的Future对象
List<Future<?>> tagFutures = new ArrayList<>();
for (Tag value : Tag.values()) {
String finalOutputDir = outputDir;
Future<?> future = tagExecutor.submit(() -> {
// 生成点数据
List<Map<String, String>> list = generateData(value.getNum(), value.getTag(), ids, count);
List<Map<String, String>> list = generateData(value.getNum(), value.getTag(), ids);
// 写入点数据
writeToCsv(list, finalOutputDir + "/" + value.getTag() + "-" + count + "s.csv");
});
tagFutures.add(future);
}
// 关闭线程不再接受新的任务
tagExecutor.shutdown();
try {
for (Future<?> tagFuture : tagFutures) {
tagFuture.get();
@ -133,22 +95,34 @@ public class GeneratorTestData {
Thread.currentThread().interrupt();
}
log.info("点数据已成功写入 CSV 文件");
// 确保线程池在程序结束前完全关闭
if (!tagExecutor.isTerminated()) {
try {
if (!tagExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
log.error("Some tasks did not complete within the specified time.");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
private static void generateEdge(ExecutorService edgeExecutor, Map<String, List<String>> ids, int count) {
private static void generateEdge(int count) {
// 生成并写入边数据
ExecutorService edgeExecutor = Executors.newFixedThreadPool(10);
List<Future<?>> edgeFutures = new ArrayList<>();
for (EdgeToTagEnums value : EdgeToTagEnums.values()) {
String finalOutputDir = outputDir;
Future<?> edgeFuture = edgeExecutor.submit(() -> {
String type = value.getType();
List<Map<String, String>> edgeData = generateEdge(ids.get(value.getSrcTag()),
ids.get(value.getDstTag()), type, 100, count);
List<Map<String, String>> edgeData = generateEdge(ids.get(value.getSrcTag()), ids.get(value.getDstTag()), type, 100);
// 开始写入边数据
writeToCsv(edgeData, finalOutputDir + "/" + type + "-" + count + ".csv");
});
edgeFutures.add(edgeFuture);
}
edgeExecutor.shutdown();
try {
for (Future<?> edgeFuture : edgeFutures) {
edgeFuture.get();
@ -159,23 +133,30 @@ public class GeneratorTestData {
// 如果主线程被中断恢复中断状态
Thread.currentThread().interrupt();
}
// 确保线程池在程序结束前完全关闭
if (!edgeExecutor.isTerminated()) {
try {
if (!edgeExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
log.error("Some tasks did not complete within the specified time.");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
private static List<Map<String, String>> generateData(int num, String tag, Map<String, List<String>> ids, int count) {
private static List<Map<String, String>> generateData(int num, String tag, Map<String, List<String>> ids) {
List<Map<String, String>> res = new ArrayList<>();
log.info("传入的参数是: " + tag);
if (Tag.TASK.getTag().equals(tag)) {
if (count >= smallNum) {
log.info("tag: {}的数据量已经达到目标,忽略", tag);
return res;
}
Set<String> taskIds = new HashSet<>();
List<String> taskIdLists = new ArrayList<>();
log.info("开始生成 task 数据...");
for (int i = 0; i < num; i++) {
if (i % printNum == 0) {
log.info("task:-{}: {}", count, i);
log.info("task: {}", i);
}
String taskId = generateUniqueId(taskIds);
taskIdLists.add(taskId);
@ -192,7 +173,7 @@ public class GeneratorTestData {
log.info("开始生成 user 数据...");
for (int i = 0; i < num; i++) {
if (i % printNum == 0) {
log.info("user:-{}: {}", count, i);
log.info("user: {}", i);
}
String userId = generateUniqueId(userIds);
userIdLists.add(userId);
@ -223,7 +204,7 @@ public class GeneratorTestData {
log.info("开始生成post数据.....");
for (int i = 0; i < num; i++) {
if (i % printNum == 0) {
log.info("post:-{}: {}", count, i);
log.info("post: {}", i);
}
String postId = generateUniqueId(postIds);
postIdLists.add(postId);
@ -247,16 +228,12 @@ public class GeneratorTestData {
}
ids.put("post", postIdLists);
} else if (Tag.GROUP.getTag().equals(tag)) {
if (count >= smallNum) {
log.info("tag: {}的数据量已经达到目标,忽略", tag);
return res;
}
Set<String> groupIds = new HashSet<>();
List<String> groupIdLists = new ArrayList<>();
log.info("开始生成group数据.....");
for (int i = 0; i < num; i++) {
if (i % printNum == 0) {
log.info("group:-{}: {}", count, i);
log.info("group: {}", i);
}
String groupId = generateUniqueId(groupIds);
groupIdLists.add(groupId);
@ -269,16 +246,12 @@ public class GeneratorTestData {
}
ids.put("group", groupIdLists);
} else if (Tag.ORGANIZATION.getTag().equals(tag)){
if (count >= smallNum) {
log.info("tag: {}的数据量已经达到目标,忽略", tag);
return res;
}
Set<String> orgIds = new HashSet<>();
List<String> orgIdLists = new ArrayList<>();
log.info("开始生成organization数据.....");
for (int i = 0; i < num; i++) {
if (i % printNum == 0) {
log.info("organization-{}: {}", count, i);
log.info("organization: {}", i);
}
String orgId = generateUniqueId(orgIds);
orgIdLists.add(orgId);
@ -306,7 +279,7 @@ public class GeneratorTestData {
* @return
*/
public static List<Map<String, String>> generateEdge(List<String> srcList, List<String> dstList,
String edgeType, int num, int count) {
String edgeType, int num) {
log.info("开始生成边数据: {}", edgeType);
if (srcList == null || dstList == null || srcList.isEmpty() || dstList.isEmpty()) {
return Collections.emptyList();
@ -318,7 +291,7 @@ public class GeneratorTestData {
for (String src : srcList) {
i++;
if (i % 1000 == 0) {
log.info("edge_type: {}-{}, {}",edgeType, count, i);
log.info("edge_type: {}, {}",edgeType, i);
}
List<String> dsts;
@ -388,7 +361,6 @@ public class GeneratorTestData {
for (Map<String, String> row : data) {
writer.writeNext(row.values().toArray(new String[0]));
}
writer.flush();
log.info("写入文件: {}", filename);
} catch (IOException e) {
e.printStackTrace();