<!Kafka Help Alert!>
koseran
Posted on January 16, 2024
Hello i'm working in a University project and i have really stack at this. I must create my own Kafka Serializer.I have do a lot of tries but i have this error...
this is my Producer
package org.example.kafkaApplication.Producer;
import org.example.kafkaApplication.JsonSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerMain {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
try (org.apache.kafka.clients.producer.Producer<String, Task> producer = new KafkaProducer<>(properties)) {
for (int i = 0; i < 20; i++) {
String taskId = "Task" + i;
String studentId = "Student" + i;
String subject = "Subject" + (i % 4); // 4 different work topics
String dateOfSubmission = "2023-01-01";
Task task = new Task(taskId, studentId, subject, dateOfSubmission);
ProducerRecord<String, Task> record = new ProducerRecord<>("task.events", task);
producer.send(record);
}
}
}
}
this is my Object class
package org.example.kafkaApplication.Producer;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
public class Task {
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"taskId",
"studentId",
"subject",
"dateOfSubmission",
})
@JsonProperty("taskId")
private String taskId;
@JsonProperty("studentId")
private String studentId;
@JsonProperty("subject")
private String subject;
@JsonProperty("dateOfSubmission")
private String dateOfSubmission;
public Task(String taskId, String studentId, String subject, String dateOfSubmission) {
this.taskId = taskId;
this.studentId = studentId;
this.subject = subject;
this.dateOfSubmission = dateOfSubmission;
}
@JsonProperty("taskId")
public String getTaskId() {
return taskId;
}
@JsonProperty("taskId")
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@JsonProperty("studentId")
public String getStudentId() {
return studentId;
}
@JsonProperty("studentId")
public void setStudentId(String studentId) {
this.studentId = studentId;
}
@JsonProperty("subject")
public String getSubject() {
return subject;
}
@JsonProperty("subject")
public void setSubject(String subject) {
this.subject = subject;
}
@JsonProperty("dateOfSubmission")
public String getDateOfSubmission() {
return dateOfSubmission;
}
@JsonProperty("dateOfSubmission")
public void setDateOfSubmission(String dateOfSubmission) {
this.dateOfSubmission = dateOfSubmission;
}
}
this is my jshonScema
{
"type": "object",
"javaType" : "org.example.kafkaApplication.Producer.Task",
"properties": {
"taskId": {
"type": "string"
},
"studentId": {
"type": "string"
},
"subject": {
"type": "string"
},
"dateOfSubmission": {
"type": "string"
}
},
"required": ["taskId", "studentId", "subject", "dateOfSubmission"],
"additionalProperties": false
}
and this is my pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>KafkaApplication</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jackson.version>2.16.2</jackson.version>
</properties>
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Json Schema to POJO plugin-->
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.5.1</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includeAdditionalProperties>false</includeAdditionalProperties>
<includeHashcodeAndEquals>false</includeHashcodeAndEquals>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.5</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.16.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.16.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.16.1</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
</project>
Have you any idea?
💖 💪 🙅 🚩
koseran
Posted on January 16, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.