Skip to main content
Version: 5.0.0

Custom Parser SDK

Kylin supports custom parsing of Kafka data through the SDK.

Setting up the development environment

Create a Project

  • Use Maven to manage project dependencies

Modify the pom.xml file

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${artifactId}</finalName>
<artifactSet>
<excludes>
<exclude>org.apache.kylin:kylin-streaming-sdk</exclude>
<exclude>org.projectlombok:*</exclude>
<exclude>org.apache.commons:*</exclude>
<exclude>com.fasterxml.jackson.core:*</exclude>
<exclude>com.google.guava:*</exclude>
<exclude>org.slf4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>javax/annotation/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Import SDK

Copy ${KYLIN_HOME}/server/jars/kylin-streaming-sdk-{version}.jar.

Create a new lib directory in the project root directory. And put the SDK Jar into the lib directory.

Load the SDK dependencies into the project

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<noop.version>1</noop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-streaming-sdk</artifactId>
<version>${noop.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kylin-streaming-sdk-{version}.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
</dependencies>

Implement a custom parser

  • Create a new parser class, XXXParser extends AbstractDataParser<ByteBuffer>.
  • Override parse(ByteBuffer input), Parse a single piece of data within a method, return Map<Field Name, Field Value>.
  • If there are initialization actions that need to be done while the instance resolver class is in use, do so in a parameterless construct.
  • Override before() if an initialization action is required before each data is parsed.
  • If an exception is thrown during the parsing of a single piece of data, the data will be considered dirty during the build and the construction of this piece of data will be skipped.
  • To check data after each piece of data is processed, Override after().
  • Create ${project.basedir}/src/resources/META-INF/services/org.apache.kylin.parser.AbstractDataParser file, and each of the parser class's class path to fill them.

Demo Parser

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.kylin</groupId>
<artifactId>custom-parser-demo</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<noop.version>1</noop.version>
</properties>

<dependencies>
<!-- SDK -->
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-streaming-sdk</artifactId>
<version>${noop.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kylin-streaming-sdk-5.0.0-SNAPSHOT.jar</systemPath>
</dependency>

<!-- Json -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.0</version>
</dependency>
<!-- CSV -->
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.7.1</version>
<exclusions>
<exclusion>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Others -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${artifactId}</finalName>
<artifactSet>
<excludes>
<exclude>org.apache.kylin:kylin-streaming-sdk</exclude>
<exclude>org.projectlombok:*</exclude>
<exclude>org.apache.commons:*</exclude>
<exclude>com.fasterxml.jackson.core:*</exclude>
<exclude>com.google.guava:*</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>javax/annotation/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Parse JSON

Input sample data

{
"name": "Li",
"sex": "man",
"age": 24,
"addr": {
"country": "China",
"city": "Shanghai",
"region": "YangPu"
},
"works": [
"work_1",
"work_2",
"work_3"
],
"create_time": "2022-11-01 08:00:00",
"update_time": "2022-11-20 12:00:00"
}

Output parsed data

{
"name": "Li",
"sex": "man",
"age": 24,
"addr_country": "China",
"addr_city": "Shanghai",
"addr_region": "YangPu",
"first_works": "work_1",
"create_time": "2022-11-01 08:00:00",
"update_time": "2022-11-20 12:00:00",
"process_time": "2022-11-20 13:00:00"
}

The parser code

package org.apache.kylin.parser.json;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.parser.utils.ParserBenchMark;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class JsonCustomParser extends AbstractDataParser<ByteBuffer> {

private static final String JSON_INPUT_STR = "{\"name\": \"Li\",\"sex\": \"man\",\"age\": 24,\"addr\": {\"country\": \"China\",\"city\": \"Shanghai\",\"region\": \"YangPu\"},\"works\": [\"work_1\",\"work_2\",\"work_3\"],\"create_time\": \"2022-11-01 08:00:00\",\"update_time\": \"2022-11-20 12:00:00\"}";
private static final ObjectMapper MAPPER = new ObjectMapper();

@SneakyThrows
@Override
protected Map<String, Object> parse(ByteBuffer buffer) {
JsonInputEntry inputEntry = MAPPER.readValue(buffer.array(), JsonInputEntry.class);
JsonOutputEntry outputEntry = JsonOutputEntry.transform(inputEntry);
return MAPPER.convertValue(outputEntry, Map.class);
}

@SneakyThrows
public static void main(String[] args) {
// get parser
AbstractDataParser<ByteBuffer> dataParser = AbstractDataParser.getDataParser(JsonCustomParser.class.getName(),
Thread.currentThread().getContextClassLoader());
// parse
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(JSON_INPUT_STR);
Map<String, Object> resultMap = dataParser.process(byteBuffer);
System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(resultMap));
// parser BenchMark
System.out.printf("parser 20k data, cost: %s ms \n", ParserBenchMark.test20K(byteBuffer, dataParser));
System.out.printf("parser 40k data, cost: %s ms \n", ParserBenchMark.test40K(byteBuffer, dataParser));
System.out.printf("parser 60k data, cost: %s ms \n", ParserBenchMark.test60K(byteBuffer, dataParser));
System.out.printf("parser 999999 data, cost: %s ms \n", ParserBenchMark.testWithSize(byteBuffer, dataParser, 999999));
}

@Data
public static class JsonInputEntry {
private String name;
private String sex;
private int age;
private Addr addr;
@Getter
private final List<String> works = Lists.newArrayList();
@JsonProperty("create_time")
private String createTime;
@JsonProperty("update_time")
private String updateTime;
}

@Data
public static class Addr {
private String country;
private String city;
private String region;
}

@Data
public static class JsonOutputEntry {
private String name;
private String sex;
private int age;
@JsonProperty("addr_country")
private String addrCountry;
@JsonProperty("addr_city")
private String addrCity;
@JsonProperty("addr_region")
private String addrRegion;
@JsonProperty("first_work")
private String firstWork;
@JsonProperty("create_time")
private String createTime;
@JsonProperty("update_time")
private String updateTime;
@JsonProperty("process_time")
private String processTime;

public static JsonOutputEntry transform(JsonInputEntry inputEntry) {
JsonOutputEntry outputEntry = new JsonOutputEntry();
outputEntry.setName(inputEntry.getName());
outputEntry.setSex(inputEntry.getSex());
outputEntry.setAge(inputEntry.getAge());
outputEntry.setAddrCountry(inputEntry.getAddr().getCountry());
outputEntry.setAddrCity(inputEntry.getAddr().getCity());
outputEntry.setAddrRegion(inputEntry.getAddr().getRegion());
outputEntry.setFirstWork(inputEntry.getWorks().isEmpty() ? null : inputEntry.getWorks().get(0));
outputEntry.setCreateTime(inputEntry.getCreateTime());
outputEntry.setUpdateTime(inputEntry.getUpdateTime());
outputEntry.setProcessTime(DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
return outputEntry;
}
}
}

Parse CSV

This case use '|' as CSV separator

Input sample data

1|Li|"deve|loper"|Table tennis|2022-11-01 08:00:00|2022-11-02 08:00:00

Output parsed data

{
"id" : 1,
"name" : "Li",
"job" : "Table tennis",
"sport" : "deve|loper",
"create_time" : "2022-11-01 08:00:00",
"delete_time" : "2022-11-02 08:00:00",
"process_time" : "2022-11-21 16:28:05"
}

The parser code

package org.apache.kylin.parser.csv;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.parser.utils.ParserBenchMark;

import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class CsvCustomParser extends AbstractDataParser<ByteBuffer> {

private static final String CSV_INPUT_STR = "1|Li|\"deve|loper\"|Table tennis|2022-11-01 08:00:00|2022-11-02 08:00:00";
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final CSVParser csvParser = new CSVParserBuilder()
.withSeparator('|')
.withQuoteChar('"')
.withEscapeChar('\\')
.build();

@SneakyThrows
@Override
protected Map<String, Object> parse(ByteBuffer buffer) {
try (StringReader reader = new StringReader(StandardCharsets.UTF_8.decode(buffer).toString());
CSVReader csvReader = new CSVReaderBuilder(reader).withCSVParser(csvParser).build()) {
List<String> line = Lists.newArrayList(csvReader.readNext());
if (line.isEmpty()) {
return Maps.newHashMap();
}
CsvOutputEntry outputEntry = CsvOutputEntry.transform(line);
return MAPPER.convertValue(outputEntry, Map.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@SneakyThrows
public static void main(String[] args) {
// get parser
AbstractDataParser<Object> dataParser = AbstractDataParser.getDataParser(CsvCustomParser.class.getName(),
Thread.currentThread().getContextClassLoader());
// parse
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(CSV_INPUT_STR);
Map<String, Object> resultMap = dataParser.process(byteBuffer);
System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(resultMap));
// parser BenchMark
System.out.printf("parser 20k data, cost: %s ms \n", ParserBenchMark.test20K(byteBuffer, dataParser));
System.out.printf("parser 40k data, cost: %s ms \n", ParserBenchMark.test40K(byteBuffer, dataParser));
System.out.printf("parser 60k data, cost: %s ms \n", ParserBenchMark.test60K(byteBuffer, dataParser));
System.out.printf("parser 999999 data, cost: %s ms \n", ParserBenchMark.testWithSize(byteBuffer, dataParser, 999999));
}

@Data
public static class CsvOutputEntry {
private long id;
private String name;
private String job;
private String sport;
@JsonProperty("create_time")
private String createTime;
@JsonProperty("delete_time")
private String deleteTime;
@JsonProperty("process_time")
private String processTime;

public static CsvOutputEntry transform(List<String> line) {
CsvOutputEntry outputEntry = new CsvOutputEntry();
outputEntry.setId(Long.parseLong(line.get(0)));
outputEntry.setName(line.get(1));
outputEntry.setSport(line.get(2));
outputEntry.setJob(line.get(3));
outputEntry.setCreateTime(line.get(4));
outputEntry.setDeleteTime(line.get(5));
outputEntry.setProcessTime(DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
return outputEntry;
}
}
}

Project Package

mvn clean package -DskipTests

Generate ${project.basedir}/target/custom-parser-demo.jar

Upload a custom parser to the system

For more details, please refer to Custom Parser Jar Package Management API

Create Kafka tables using a custom parser

Json

CSV