Compare commits

..

No commits in common. "d4457be1c6346ae767a16cd87b878eaeb890aa41" and "709e1e31f938d86dbb96aa8eb7c3df37b3d6fe83" have entirely different histories.

2 changed files with 48 additions and 79 deletions

3
.gitignore vendored
View File

@ -5,11 +5,9 @@ target/
### IntelliJ IDEA ### ### IntelliJ IDEA ###
.idea/modules.xml .idea/modules.xml
.idea/misc.xml
.idea/jarRepositories.xml .idea/jarRepositories.xml
.idea/compiler.xml .idea/compiler.xml
.idea/libraries/ .idea/libraries/
.idea/inspectionProfiles/Project_Default.xml
*.iws *.iws
*.iml *.iml
*.ipr *.ipr
@ -38,4 +36,3 @@ build/
### Mac OS ### ### Mac OS ###
.DS_Store .DS_Store
/.idea/

View File

@ -35,16 +35,19 @@ public class GeneratorTestData {
); );
private static final Logger log = LoggerFactory.getLogger(GeneratorTestData.class); private static final Logger log = LoggerFactory.getLogger(GeneratorTestData.class);
private static String generateId() { private static String generateId() {
return faker.bothify("????????????????????????????????????????"); return faker.bothify("????????????????????????????????????????");
} }
public static final int printNum = 1000; public static final int printNum = 1000;
public static final int circleNum = 10; public static final int circleNum = 3;
public static final int smallNum = 1; private static Map<String, List<String>> ids = new HashMap<>();
public static String outputDir = ""; public static String outputDir = "";
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
Map<String, List<String>> ids = new HashMap<>();
if (args != null && args.length >= 1) { if (args != null && args.length >= 1) {
for (String arg : args) { for (String arg : args) {
outputDir = arg; outputDir = arg;
@ -56,72 +59,31 @@ public class GeneratorTestData {
} }
// 创建目录 // 创建目录
Files.createDirectories(Paths.get(outputDir)); Files.createDirectories(Paths.get(outputDir));
// 创建tag线程池
ThreadPoolExecutor tagExecutor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(), // workQueue 不指定初始大小默认无界
Executors.defaultThreadFactory() // threadFactory
);
// 创建edge线程池
ThreadPoolExecutor edgeExecutor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(), // workQueue 不指定初始大小默认无界
Executors.defaultThreadFactory() // threadFactory
);
for (int i = 0; i < circleNum; i++) { for (int i = 0; i < circleNum; i++) {
Map<String, List<String>> ids = new HashMap<>(); generateData(ids, i);
generateData(tagExecutor, ids, i); generateEdge(i);
generateEdge(edgeExecutor, ids, i); ids = new HashMap<>();
} }
// 关闭线程不再接受新的任务
showdownExecutor(tagExecutor);
showdownExecutor(edgeExecutor);
log.info("所有数据生成执行结束...."); log.info("所有数据生成执行结束....");
} }
/** private static void generateData(Map<String, List<String>> ids, int count) {
* 关闭线程池 ExecutorService tagExecutor = Executors.newFixedThreadPool(5);
* @param executor
*/
private static void showdownExecutor(ExecutorService executor) {
executor.shutdown();
// 确保线程池在程序结束前完全关闭
if (!executor.isTerminated()) {
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
log.error("Some tasks did not complete within the specified time.");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
private static void generateData(ExecutorService tagExecutor, Map<String, List<String>> ids, int count) {
// 使用一个列表来保存所有的Future对象 // 使用一个列表来保存所有的Future对象
List<Future<?>> tagFutures = new ArrayList<>(); List<Future<?>> tagFutures = new ArrayList<>();
for (Tag value : Tag.values()) { for (Tag value : Tag.values()) {
String finalOutputDir = outputDir; String finalOutputDir = outputDir;
Future<?> future = tagExecutor.submit(() -> { Future<?> future = tagExecutor.submit(() -> {
// 生成点数据 // 生成点数据
List<Map<String, String>> list = generateData(value.getNum(), value.getTag(), ids, count); List<Map<String, String>> list = generateData(value.getNum(), value.getTag(), ids);
// 写入点数据 // 写入点数据
writeToCsv(list, finalOutputDir + "/" + value.getTag() + "-" + count + "s.csv"); writeToCsv(list, finalOutputDir + "/" + value.getTag() + "-" + count + "s.csv");
}); });
tagFutures.add(future); tagFutures.add(future);
} }
// 关闭线程不再接受新的任务
tagExecutor.shutdown();
try { try {
for (Future<?> tagFuture : tagFutures) { for (Future<?> tagFuture : tagFutures) {
tagFuture.get(); tagFuture.get();
@ -133,22 +95,34 @@ public class GeneratorTestData {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
log.info("点数据已成功写入 CSV 文件"); log.info("点数据已成功写入 CSV 文件");
// 确保线程池在程序结束前完全关闭
if (!tagExecutor.isTerminated()) {
try {
if (!tagExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
log.error("Some tasks did not complete within the specified time.");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
} }
private static void generateEdge(ExecutorService edgeExecutor, Map<String, List<String>> ids, int count) { private static void generateEdge(int count) {
// 生成并写入边数据 // 生成并写入边数据
ExecutorService edgeExecutor = Executors.newFixedThreadPool(10);
List<Future<?>> edgeFutures = new ArrayList<>(); List<Future<?>> edgeFutures = new ArrayList<>();
for (EdgeToTagEnums value : EdgeToTagEnums.values()) { for (EdgeToTagEnums value : EdgeToTagEnums.values()) {
String finalOutputDir = outputDir; String finalOutputDir = outputDir;
Future<?> edgeFuture = edgeExecutor.submit(() -> { Future<?> edgeFuture = edgeExecutor.submit(() -> {
String type = value.getType(); String type = value.getType();
List<Map<String, String>> edgeData = generateEdge(ids.get(value.getSrcTag()), List<Map<String, String>> edgeData = generateEdge(ids.get(value.getSrcTag()), ids.get(value.getDstTag()), type, 100);
ids.get(value.getDstTag()), type, 100, count);
// 开始写入边数据 // 开始写入边数据
writeToCsv(edgeData, finalOutputDir + "/" + type + "-" + count + ".csv"); writeToCsv(edgeData, finalOutputDir + "/" + type + "-" + count + ".csv");
}); });
edgeFutures.add(edgeFuture); edgeFutures.add(edgeFuture);
} }
edgeExecutor.shutdown();
try { try {
for (Future<?> edgeFuture : edgeFutures) { for (Future<?> edgeFuture : edgeFutures) {
edgeFuture.get(); edgeFuture.get();
@ -159,23 +133,30 @@ public class GeneratorTestData {
// 如果主线程被中断恢复中断状态 // 如果主线程被中断恢复中断状态
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
// 确保线程池在程序结束前完全关闭
if (!edgeExecutor.isTerminated()) {
try {
if (!edgeExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
log.error("Some tasks did not complete within the specified time.");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
} }
private static List<Map<String, String>> generateData(int num, String tag, Map<String, List<String>> ids, int count) { private static List<Map<String, String>> generateData(int num, String tag, Map<String, List<String>> ids) {
List<Map<String, String>> res = new ArrayList<>(); List<Map<String, String>> res = new ArrayList<>();
log.info("传入的参数是: " + tag); log.info("传入的参数是: " + tag);
if (Tag.TASK.getTag().equals(tag)) { if (Tag.TASK.getTag().equals(tag)) {
if (count >= smallNum) {
log.info("tag: {}的数据量已经达到目标,忽略", tag);
return res;
}
Set<String> taskIds = new HashSet<>(); Set<String> taskIds = new HashSet<>();
List<String> taskIdLists = new ArrayList<>(); List<String> taskIdLists = new ArrayList<>();
log.info("开始生成 task 数据..."); log.info("开始生成 task 数据...");
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (i % printNum == 0) { if (i % printNum == 0) {
log.info("task:-{}: {}", count, i); log.info("task: {}", i);
} }
String taskId = generateUniqueId(taskIds); String taskId = generateUniqueId(taskIds);
taskIdLists.add(taskId); taskIdLists.add(taskId);
@ -192,7 +173,7 @@ public class GeneratorTestData {
log.info("开始生成 user 数据..."); log.info("开始生成 user 数据...");
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (i % printNum == 0) { if (i % printNum == 0) {
log.info("user:-{}: {}", count, i); log.info("user: {}", i);
} }
String userId = generateUniqueId(userIds); String userId = generateUniqueId(userIds);
userIdLists.add(userId); userIdLists.add(userId);
@ -223,7 +204,7 @@ public class GeneratorTestData {
log.info("开始生成post数据....."); log.info("开始生成post数据.....");
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (i % printNum == 0) { if (i % printNum == 0) {
log.info("post:-{}: {}", count, i); log.info("post: {}", i);
} }
String postId = generateUniqueId(postIds); String postId = generateUniqueId(postIds);
postIdLists.add(postId); postIdLists.add(postId);
@ -247,16 +228,12 @@ public class GeneratorTestData {
} }
ids.put("post", postIdLists); ids.put("post", postIdLists);
} else if (Tag.GROUP.getTag().equals(tag)) { } else if (Tag.GROUP.getTag().equals(tag)) {
if (count >= smallNum) {
log.info("tag: {}的数据量已经达到目标,忽略", tag);
return res;
}
Set<String> groupIds = new HashSet<>(); Set<String> groupIds = new HashSet<>();
List<String> groupIdLists = new ArrayList<>(); List<String> groupIdLists = new ArrayList<>();
log.info("开始生成group数据....."); log.info("开始生成group数据.....");
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (i % printNum == 0) { if (i % printNum == 0) {
log.info("group:-{}: {}", count, i); log.info("group: {}", i);
} }
String groupId = generateUniqueId(groupIds); String groupId = generateUniqueId(groupIds);
groupIdLists.add(groupId); groupIdLists.add(groupId);
@ -269,16 +246,12 @@ public class GeneratorTestData {
} }
ids.put("group", groupIdLists); ids.put("group", groupIdLists);
} else if (Tag.ORGANIZATION.getTag().equals(tag)){ } else if (Tag.ORGANIZATION.getTag().equals(tag)){
if (count >= smallNum) {
log.info("tag: {}的数据量已经达到目标,忽略", tag);
return res;
}
Set<String> orgIds = new HashSet<>(); Set<String> orgIds = new HashSet<>();
List<String> orgIdLists = new ArrayList<>(); List<String> orgIdLists = new ArrayList<>();
log.info("开始生成organization数据....."); log.info("开始生成organization数据.....");
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (i % printNum == 0) { if (i % printNum == 0) {
log.info("organization-{}: {}", count, i); log.info("organization: {}", i);
} }
String orgId = generateUniqueId(orgIds); String orgId = generateUniqueId(orgIds);
orgIdLists.add(orgId); orgIdLists.add(orgId);
@ -306,7 +279,7 @@ public class GeneratorTestData {
* @return * @return
*/ */
public static List<Map<String, String>> generateEdge(List<String> srcList, List<String> dstList, public static List<Map<String, String>> generateEdge(List<String> srcList, List<String> dstList,
String edgeType, int num, int count) { String edgeType, int num) {
log.info("开始生成边数据: {}", edgeType); log.info("开始生成边数据: {}", edgeType);
if (srcList == null || dstList == null || srcList.isEmpty() || dstList.isEmpty()) { if (srcList == null || dstList == null || srcList.isEmpty() || dstList.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
@ -318,7 +291,7 @@ public class GeneratorTestData {
for (String src : srcList) { for (String src : srcList) {
i++; i++;
if (i % 1000 == 0) { if (i % 1000 == 0) {
log.info("edge_type: {}-{}, {}",edgeType, count, i); log.info("edge_type: {}, {}",edgeType, i);
} }
List<String> dsts; List<String> dsts;
@ -388,7 +361,6 @@ public class GeneratorTestData {
for (Map<String, String> row : data) { for (Map<String, String> row : data) {
writer.writeNext(row.values().toArray(new String[0])); writer.writeNext(row.values().toArray(new String[0]));
} }
writer.flush();
log.info("写入文件: {}", filename); log.info("写入文件: {}", filename);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();