Compare commits

...

3 Commits

Author SHA1 Message Date
8e839fda65 kafka 2024-12-27 16:32:58 +08:00
c7b42355fc kafka 2024-12-27 16:32:18 +08:00
d325251979 kafka 2024-12-27 16:22:26 +08:00
6 changed files with 256 additions and 0 deletions

View File

@ -1,5 +1,120 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="HttpUrlsUsage" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredUrls">
<list>
<option value="http://0.0.0.0" />
<option value="http://127.0.0.1" />
<option value="http://activemq.apache.org/schema/" />
<option value="http://cxf.apache.org/schemas/" />
<option value="http://java.sun.com/" />
<option value="http://javafx.com/fxml" />
<option value="http://javafx.com/javafx/" />
<option value="http://json-schema.org/draft" />
<option value="http://localhost" />
<option value="http://maven.apache.org/POM/" />
<option value="http://maven.apache.org/xsd/" />
<option value="http://packages.confluent.io" />
<option value="http://primefaces.org/ui" />
<option value="http://schema.cloudfoundry.org/spring/" />
<option value="http://schemas.xmlsoap.org/" />
<option value="http://tiles.apache.org/" />
<option value="http://www.ibm.com/webservices/xsd" />
<option value="http://www.jboss.com/xml/ns/" />
<option value="http://www.jboss.org/j2ee/schema/" />
<option value="http://www.springframework.org/schema/" />
<option value="http://www.springframework.org/security/tags" />
<option value="http://www.springframework.org/tags" />
<option value="http://www.thymeleaf.org" />
<option value="http://www.w3.org/" />
<option value="http://xmlns.jcp.org/" />
</list>
</option>
</inspection_tool>
<inspection_tool class="VulnerableLibrariesLocal" enabled="true" level="WARNING" enabled_by_default="true">
<option name="isIgnoringEnabled" value="true" />
<option name="ignoredModules">
<list>
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
<option value="y-server-tools" />
</list>
</option>
<option name="ignoredPackages">
<list>
<option value="ch.qos.logback:logback-classic:1.2.3" />
<option value="com.jayway.jsonpath:json-path:2.4.0" />
<option value="net.minidev:json-smart:2.3" />
<option value="org.xmlunit:xmlunit-core:2.6.3" />
<option value="ch.qos.logback:logback-core:1.2.3" />
<option value="org.springframework.boot:spring-boot-autoconfigure:2.2.5.RELEASE" />
<option value="org.springframework.boot:spring-boot:2.2.5.RELEASE" />
<option value="org.springframework:spring-context:5.2.4.RELEASE" />
<option value="org.springframework:spring-core:5.2.4.RELEASE" />
<option value="org.yaml:snakeyaml:1.25" />
<option value="com.fasterxml.jackson.core:jackson-databind:2.10.2" />
<option value="org.apache.tomcat.embed:tomcat-embed-core:9.0.31" />
<option value="org.apache.tomcat.embed:tomcat-embed-websocket:9.0.31" />
<option value="org.hibernate.validator:hibernate-validator:6.0.18.Final" />
<option value="org.springframework.boot:spring-boot-starter-web:2.2.5.RELEASE" />
<option value="org.apache.kafka:kafka-clients:2.3.1" />
<option value="org.springframework:spring-beans:5.2.4.RELEASE" />
<option value="org.springframework:spring-messaging:5.2.4.RELEASE" />
<option value="org.springframework:spring-expression:5.2.4.RELEASE" />
<option value="junit:junit:4.12" />
<option value="org.apache.avro:avro:1.8.2" />
<option value="io.netty:netty:3.10.6.Final" />
<option value="org.apache.zookeeper:zookeeper:3.4.14" />
</list>
</option>
<option name="ignoredReasons">
<list>
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
<option value="Not exploitable" />
</list>
</option>
</inspection_tool>
</profile>
</component>

View File

@ -7,6 +7,7 @@
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
<option name="workspaceImportForciblyTurnedOn" value="true" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />

72
pom.xml
View File

@ -12,6 +12,78 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<avro.version>1.8.2</avro.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,13 @@
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Demo2Application {
public static void main(String[] args) {
SpringApplication.run(Demo2Application.class, args);
}
}

View File

@ -0,0 +1,28 @@
package com.example.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {"dwd_post_data_k","dwd_comment_data_k","dwd_user_data_k","dwd_relation_data_k","dwd_action_data_k"}, groupId = "new_line_test")
public void postData(ConsumerRecord<String, GenericRecord> message)
{
logMessage(message);
}
private void logMessage(ConsumerRecord<String, GenericRecord> message){
String sb = "receive message:" +
"topic:" + message.topic() +
"key:" + message.key() +
"value:" + message.value();
log.info(sb);
}
}

View File

@ -0,0 +1,27 @@
server:
port: 18081
spring:
application:
name: application-kafka
kafka:
bootstrap-servers: 10.103.3.1:9092,10.103.3.17:9092,10.103.3.18:9092,10.103.3.50:9092,10.103.3.51:9092,10.103.3.52:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
retries: 0
acks: all
consumer:
group-id: new_line_test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
auto-offset-reset: latest
enable-auto-commit: true
properties:
schema.registry.url: http://10.103.3.1:8081
security.protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas.config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Admin@123";'
listener:
missing-topics-fatal: false