Compare commits
2 Commits
709e1e31f9
...
d4457be1c6
| Author | SHA1 | Date | |
|---|---|---|---|
| d4457be1c6 | |||
| c6b786f1ae |
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -5,9 +5,11 @@ target/
|
|||
|
||||
### IntelliJ IDEA ###
|
||||
.idea/modules.xml
|
||||
.idea/misc.xml
|
||||
.idea/jarRepositories.xml
|
||||
.idea/compiler.xml
|
||||
.idea/libraries/
|
||||
.idea/inspectionProfiles/Project_Default.xml
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
|
@ -36,3 +38,4 @@ build/
|
|||
|
||||
### Mac OS ###
|
||||
.DS_Store
|
||||
/.idea/
|
||||
|
|
|
|||
|
|
@ -35,19 +35,16 @@ public class GeneratorTestData {
|
|||
);
|
||||
private static final Logger log = LoggerFactory.getLogger(GeneratorTestData.class);
|
||||
|
||||
|
||||
private static String generateId() {
|
||||
return faker.bothify("????????????????????????????????????????");
|
||||
}
|
||||
|
||||
public static final int printNum = 1000;
|
||||
public static final int circleNum = 3;
|
||||
private static Map<String, List<String>> ids = new HashMap<>();
|
||||
public static final int circleNum = 10;
|
||||
public static final int smallNum = 1;
|
||||
public static String outputDir = "";
|
||||
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
Map<String, List<String>> ids = new HashMap<>();
|
||||
if (args != null && args.length >= 1) {
|
||||
for (String arg : args) {
|
||||
outputDir = arg;
|
||||
|
|
@ -59,31 +56,72 @@ public class GeneratorTestData {
|
|||
}
|
||||
// 创建目录
|
||||
Files.createDirectories(Paths.get(outputDir));
|
||||
// 创建tag线程池
|
||||
ThreadPoolExecutor tagExecutor = new ThreadPoolExecutor(
|
||||
2, // corePoolSize
|
||||
4, // maximumPoolSize
|
||||
60, // keepAliveTime
|
||||
TimeUnit.SECONDS, // unit
|
||||
new LinkedBlockingQueue<>(), // workQueue 不指定初始大小,默认无界
|
||||
Executors.defaultThreadFactory() // threadFactory
|
||||
);
|
||||
// 创建edge线程池
|
||||
ThreadPoolExecutor edgeExecutor = new ThreadPoolExecutor(
|
||||
5, // corePoolSize
|
||||
10, // maximumPoolSize
|
||||
60, // keepAliveTime
|
||||
TimeUnit.SECONDS, // unit
|
||||
new LinkedBlockingQueue<>(), // workQueue 不指定初始大小,默认无界
|
||||
Executors.defaultThreadFactory() // threadFactory
|
||||
);
|
||||
|
||||
for (int i = 0; i < circleNum; i++) {
|
||||
generateData(ids, i);
|
||||
generateEdge(i);
|
||||
ids = new HashMap<>();
|
||||
Map<String, List<String>> ids = new HashMap<>();
|
||||
generateData(tagExecutor, ids, i);
|
||||
generateEdge(edgeExecutor, ids, i);
|
||||
}
|
||||
|
||||
// 关闭线程,不再接受新的任务
|
||||
showdownExecutor(tagExecutor);
|
||||
showdownExecutor(edgeExecutor);
|
||||
|
||||
log.info("所有数据生成执行结束....");
|
||||
}
|
||||
|
||||
private static void generateData(Map<String, List<String>> ids, int count) {
|
||||
ExecutorService tagExecutor = Executors.newFixedThreadPool(5);
|
||||
/**
|
||||
* 关闭线程池
|
||||
* @param executor
|
||||
*/
|
||||
private static void showdownExecutor(ExecutorService executor) {
|
||||
executor.shutdown();
|
||||
// 确保线程池在程序结束前完全关闭
|
||||
if (!executor.isTerminated()) {
|
||||
try {
|
||||
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||
log.error("Some tasks did not complete within the specified time.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void generateData(ExecutorService tagExecutor, Map<String, List<String>> ids, int count) {
|
||||
|
||||
// 使用一个列表来保存所有的Future对象
|
||||
List<Future<?>> tagFutures = new ArrayList<>();
|
||||
for (Tag value : Tag.values()) {
|
||||
String finalOutputDir = outputDir;
|
||||
Future<?> future = tagExecutor.submit(() -> {
|
||||
// 生成点数据
|
||||
List<Map<String, String>> list = generateData(value.getNum(), value.getTag(), ids);
|
||||
List<Map<String, String>> list = generateData(value.getNum(), value.getTag(), ids, count);
|
||||
// 写入点数据
|
||||
writeToCsv(list, finalOutputDir + "/" + value.getTag() + "-" + count + "s.csv");
|
||||
});
|
||||
tagFutures.add(future);
|
||||
}
|
||||
// 关闭线程,不再接受新的任务
|
||||
tagExecutor.shutdown();
|
||||
|
||||
try {
|
||||
for (Future<?> tagFuture : tagFutures) {
|
||||
tagFuture.get();
|
||||
|
|
@ -95,34 +133,22 @@ public class GeneratorTestData {
|
|||
Thread.currentThread().interrupt();
|
||||
}
|
||||
log.info("点数据已成功写入 CSV 文件");
|
||||
// 确保线程池在程序结束前完全关闭
|
||||
if (!tagExecutor.isTerminated()) {
|
||||
try {
|
||||
if (!tagExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||
log.error("Some tasks did not complete within the specified time.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void generateEdge(int count) {
|
||||
private static void generateEdge(ExecutorService edgeExecutor, Map<String, List<String>> ids, int count) {
|
||||
// 生成并写入边数据
|
||||
ExecutorService edgeExecutor = Executors.newFixedThreadPool(10);
|
||||
List<Future<?>> edgeFutures = new ArrayList<>();
|
||||
for (EdgeToTagEnums value : EdgeToTagEnums.values()) {
|
||||
String finalOutputDir = outputDir;
|
||||
Future<?> edgeFuture = edgeExecutor.submit(() -> {
|
||||
String type = value.getType();
|
||||
List<Map<String, String>> edgeData = generateEdge(ids.get(value.getSrcTag()), ids.get(value.getDstTag()), type, 100);
|
||||
List<Map<String, String>> edgeData = generateEdge(ids.get(value.getSrcTag()),
|
||||
ids.get(value.getDstTag()), type, 100, count);
|
||||
// 开始写入边数据
|
||||
writeToCsv(edgeData, finalOutputDir + "/" + type + "-" + count + ".csv");
|
||||
});
|
||||
edgeFutures.add(edgeFuture);
|
||||
}
|
||||
edgeExecutor.shutdown();
|
||||
try {
|
||||
for (Future<?> edgeFuture : edgeFutures) {
|
||||
edgeFuture.get();
|
||||
|
|
@ -133,30 +159,23 @@ public class GeneratorTestData {
|
|||
// 如果主线程被中断,恢复中断状态
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
// 确保线程池在程序结束前完全关闭
|
||||
if (!edgeExecutor.isTerminated()) {
|
||||
try {
|
||||
if (!edgeExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||
log.error("Some tasks did not complete within the specified time.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static List<Map<String, String>> generateData(int num, String tag, Map<String, List<String>> ids) {
|
||||
private static List<Map<String, String>> generateData(int num, String tag, Map<String, List<String>> ids, int count) {
|
||||
List<Map<String, String>> res = new ArrayList<>();
|
||||
log.info("传入的参数是: " + tag);
|
||||
|
||||
if (Tag.TASK.getTag().equals(tag)) {
|
||||
if (count >= smallNum) {
|
||||
log.info("tag: {}的数据量已经达到目标,忽略", tag);
|
||||
return res;
|
||||
}
|
||||
Set<String> taskIds = new HashSet<>();
|
||||
List<String> taskIdLists = new ArrayList<>();
|
||||
log.info("开始生成 task 数据...");
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (i % printNum == 0) {
|
||||
log.info("task: {}", i);
|
||||
log.info("task:-{}: {}", count, i);
|
||||
}
|
||||
String taskId = generateUniqueId(taskIds);
|
||||
taskIdLists.add(taskId);
|
||||
|
|
@ -173,7 +192,7 @@ public class GeneratorTestData {
|
|||
log.info("开始生成 user 数据...");
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (i % printNum == 0) {
|
||||
log.info("user: {}", i);
|
||||
log.info("user:-{}: {}", count, i);
|
||||
}
|
||||
String userId = generateUniqueId(userIds);
|
||||
userIdLists.add(userId);
|
||||
|
|
@ -204,7 +223,7 @@ public class GeneratorTestData {
|
|||
log.info("开始生成post数据.....");
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (i % printNum == 0) {
|
||||
log.info("post: {}", i);
|
||||
log.info("post:-{}: {}", count, i);
|
||||
}
|
||||
String postId = generateUniqueId(postIds);
|
||||
postIdLists.add(postId);
|
||||
|
|
@ -228,12 +247,16 @@ public class GeneratorTestData {
|
|||
}
|
||||
ids.put("post", postIdLists);
|
||||
} else if (Tag.GROUP.getTag().equals(tag)) {
|
||||
if (count >= smallNum) {
|
||||
log.info("tag: {}的数据量已经达到目标,忽略", tag);
|
||||
return res;
|
||||
}
|
||||
Set<String> groupIds = new HashSet<>();
|
||||
List<String> groupIdLists = new ArrayList<>();
|
||||
log.info("开始生成group数据.....");
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (i % printNum == 0) {
|
||||
log.info("group: {}", i);
|
||||
log.info("group:-{}: {}", count, i);
|
||||
}
|
||||
String groupId = generateUniqueId(groupIds);
|
||||
groupIdLists.add(groupId);
|
||||
|
|
@ -246,12 +269,16 @@ public class GeneratorTestData {
|
|||
}
|
||||
ids.put("group", groupIdLists);
|
||||
} else if (Tag.ORGANIZATION.getTag().equals(tag)){
|
||||
if (count >= smallNum) {
|
||||
log.info("tag: {}的数据量已经达到目标,忽略", tag);
|
||||
return res;
|
||||
}
|
||||
Set<String> orgIds = new HashSet<>();
|
||||
List<String> orgIdLists = new ArrayList<>();
|
||||
log.info("开始生成organization数据.....");
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (i % printNum == 0) {
|
||||
log.info("organization: {}", i);
|
||||
log.info("organization-{}: {}", count, i);
|
||||
}
|
||||
String orgId = generateUniqueId(orgIds);
|
||||
orgIdLists.add(orgId);
|
||||
|
|
@ -279,7 +306,7 @@ public class GeneratorTestData {
|
|||
* @return
|
||||
*/
|
||||
public static List<Map<String, String>> generateEdge(List<String> srcList, List<String> dstList,
|
||||
String edgeType, int num) {
|
||||
String edgeType, int num, int count) {
|
||||
log.info("开始生成边数据: {}", edgeType);
|
||||
if (srcList == null || dstList == null || srcList.isEmpty() || dstList.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
|
|
@ -291,7 +318,7 @@ public class GeneratorTestData {
|
|||
for (String src : srcList) {
|
||||
i++;
|
||||
if (i % 1000 == 0) {
|
||||
log.info("edge_type: {}, {}",edgeType, i);
|
||||
log.info("edge_type: {}-{}, {}",edgeType, count, i);
|
||||
}
|
||||
|
||||
List<String> dsts;
|
||||
|
|
@ -361,6 +388,7 @@ public class GeneratorTestData {
|
|||
for (Map<String, String> row : data) {
|
||||
writer.writeNext(row.values().toArray(new String[0]));
|
||||
}
|
||||
writer.flush();
|
||||
log.info("写入文件: {}", filename);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user