summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-08-01 10:29:46 +0800
committerdoufenghu <[email protected]>2024-08-01 10:29:46 +0800
commitd2d809407163b172333f69a9dd28f6eed6f4a44e (patch)
tree64577cb487e4510f7d429f8e269f1b6b6bf07dd6
parent708137a41c1806b4bc6925fc71b0a2892862d9ea (diff)
[feature][e2e-test] add e2e-clickhouse module
-rw-r--r--groot-tests/pom.xml1
-rw-r--r--groot-tests/test-e2e-clickhouse/pom.xml63
-rw-r--r--groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java363
-rw-r--r--groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_init.conf143
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaIT.java (renamed from groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java)5
-rw-r--r--pom.xml46
6 files changed, 595 insertions, 26 deletions
diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml
index 76f533a..b46ad10 100644
--- a/groot-tests/pom.xml
+++ b/groot-tests/pom.xml
@@ -16,6 +16,7 @@
<module>test-common</module>
<module>test-e2e-base</module>
<module>test-e2e-kafka</module>
+ <module>test-e2e-clickhouse</module>
</modules>
<properties>
diff --git a/groot-tests/test-e2e-clickhouse/pom.xml b/groot-tests/test-e2e-clickhouse/pom.xml
new file mode 100644
index 0000000..c576100
--- /dev/null
+++ b/groot-tests/test-e2e-clickhouse/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-tests</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>test-e2e-clickhouse</artifactId>
+ <name>Groot : Tests : E2E : ClickHouse</name>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <clickhouse.jdbc.version>0.3.2-patch9</clickhouse.jdbc.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>test-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>clickhouse</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.clickhouse</groupId>
+ <artifactId>clickhouse-jdbc</artifactId>
+ <version>${clickhouse.jdbc.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-clickhouse</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy-java.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java
new file mode 100644
index 0000000..f752d57
--- /dev/null
+++ b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java
@@ -0,0 +1,363 @@
+package com.geedgenetworks.test.e2e.clickhouse;
+
+import com.geedgenetworks.test.common.TestResource;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.ContainerUtil;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.ClickHouseContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ disabledReason = "Override TestSuiteBase @DisabledOnContainer")
+public class ClickHouseIT extends TestSuiteBase implements TestResource {
+ private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.19.32";
+ private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver";
+ private static final String INIT_CLICKHOUSE_PATH = "/init/clickhouse_init.conf";
+ private static final String DATABASE = "default";
+ private static final String SOURCE_TABLE = "source_table";
+ private static final String SINK_TABLE = "sink_table";
+ private static final String INSERT_SQL = "insert_sql";
+ private static final String COMPARE_SQL = "compare_sql";
+ private static final String HOST = "clickhouse";
+ private static final Config CONFIG = getInitClickhouseConfig();
+ private ClickHouseContainer clickHouseContainer;
+ private Connection connection;
+ private static final List<Object[]>TEST_DATASET =
+ generateTestDataSet();
+ private static final String[] default_columns = new String[] {
+ "id",
+ "c_map",
+ "c_array_string",
+ "c_array_short",
+ "c_array_int",
+ "c_array_long",
+ "c_array_float",
+ "c_array_double",
+ "c_string",
+ "c_boolean",
+ "c_int8",
+ "c_int16",
+ "c_int32",
+ "c_int64",
+ "c_float32",
+ "c_float64",
+ "c_decimal",
+ "c_date",
+ "c_datetime",
+ "c_nullable",
+ "c_lowcardinality",
+ "c_nested.int",
+ "c_nested.double",
+ "c_nested.string",
+ "c_int128",
+ "c_uint128",
+ "c_int256",
+ "c_uint256",
+ "c_point",
+ "c_ring"
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.clickHouseContainer =
+ new ClickHouseContainer(CLICKHOUSE_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE)));
+ Startables.deepStart(Stream.of(this.clickHouseContainer)).join();
+ log.info("Clickhouse container started");
+ Awaitility.given()
+ .ignoreExceptions()
+ .await()
+ .atMost(360L, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+ this.initializeClickhouseTable();
+ this.batchInsertData();
+
+ }
+
+
+ @TestTemplate
+ public void testClickhouse(TestContainer container) throws Exception {
+ assertHasData(SOURCE_TABLE);
+
+ //assertHasData(SINK_TABLE);
+ //compareResult();
+ //clearSinkTable();
+
+ }
+
+ private void assertHasData(String table) {
+ try (Statement statement = connection.createStatement()) {
+ String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+ ResultSet source = statement.executeQuery(sql);
+ Assertions.assertTrue(source.next());
+ } catch (SQLException e) {
+ throw new RuntimeException("test clickhouse server image error", e);
+ }
+ }
+
+ private void clearSinkTable() {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(String.format("truncate table %s.%s", DATABASE, SINK_TABLE));
+ } catch (SQLException e) {
+ throw new RuntimeException("Test clickhouse server image error", e);
+ }
+ }
+
+ private void compareResult() throws SQLException, IOException {
+ String sourceSql = "select * from " + SOURCE_TABLE + " order by id";
+ String sinkSql = "select * from " + SINK_TABLE + " order by id";
+ List<String> columnList =
+ Arrays.stream(default_columns).collect(Collectors.toList());
+ Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ Assertions.assertEquals(
+ sourceResultSet.getMetaData().getColumnCount(),
+ sinkResultSet.getMetaData().getColumnCount());
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columnList) {
+ Object source = sourceResultSet.getObject(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
+ InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
+ String sourceValue =
+ IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue =
+ IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ Assertions.assertTrue(true);
+ }
+ }
+ }
+ String columns = String.join(",", default_columns);
+ Assertions.assertTrue(
+ compare(String.format(CONFIG.getString(COMPARE_SQL), columns, columns)));
+ }
+
+ private Boolean compare(String sql) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sql);
+ return !resultSet.next();
+ } catch (SQLException e) {
+ throw new RuntimeException("result compare error", e);
+ }
+ }
+
+
+ private void batchInsertData() {
+ String sql = CONFIG.getString(INSERT_SQL);
+ PreparedStatement preparedStatement = null;
+ try {
+ this.connection.setAutoCommit(true);
+ preparedStatement = this.connection.prepareStatement(sql);
+ for (Object[] row : TEST_DATASET) {
+ preparedStatement.setLong(1, (Long) row[0]);
+ preparedStatement.setObject(2, row[1]);
+ preparedStatement.setArray(3, toSqlArray(row[2]));
+ preparedStatement.setArray(4, toSqlArray(row[3]));
+ preparedStatement.setArray(5, toSqlArray(row[4]));
+ preparedStatement.setArray(6, toSqlArray(row[5]));
+ preparedStatement.setArray(7, toSqlArray(row[6]));
+ preparedStatement.setArray(8, toSqlArray(row[7]));
+ preparedStatement.setString(9, (String) row[8]);
+ preparedStatement.setBoolean(10, (Boolean) row[9]);
+ preparedStatement.setByte(11, (Byte) row[10]);
+ preparedStatement.setShort(12, (Short) row[11]);
+ preparedStatement.setInt(13, (Integer) row[12]);
+ preparedStatement.setLong(14, (Long) row[13]);
+ preparedStatement.setFloat(15, (Float) row[14]);
+ preparedStatement.setDouble(16, (Double) row[15]);
+ preparedStatement.setBigDecimal(17, (BigDecimal) row[16]);
+ preparedStatement.setDate(18, Date.valueOf((LocalDate) row[17]));
+ preparedStatement.setTimestamp(
+ 19, Timestamp.valueOf((LocalDateTime) row[18]));
+ preparedStatement.setInt(20, (Integer) row[19]);
+ preparedStatement.setString(21, (String) row[20]);
+ preparedStatement.setArray(22, toSqlArray(row[21]));
+ preparedStatement.setArray(23, toSqlArray(row[22]));
+ preparedStatement.setArray(24, toSqlArray(row[23]));
+ preparedStatement.setObject(25, row[24]);
+ preparedStatement.setObject(26, row[25]);
+ preparedStatement.setObject(27, row[26]);
+ preparedStatement.setObject(28, row[27]);
+ preparedStatement.setObject(29, row[28]);
+ preparedStatement.setObject(30, row[29]);
+ preparedStatement.addBatch();
+ }
+
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ } finally {
+ if (preparedStatement != null) {
+ try {
+ preparedStatement.close();
+ } catch (SQLException e) {
+ throw new RuntimeException("PreparedStatement close failed!", e);
+ }
+ }
+ }
+
+
+
+ }
+
+
+ private Array toSqlArray(Object value) throws SQLException {
+ Object[] elements = null;
+ String sqlType = null;
+ if (String[].class.equals(value.getClass())) {
+ sqlType = "TEXT";
+ elements = (String[]) value;
+ } else if (Boolean[].class.equals(value.getClass())) {
+ sqlType = "BOOLEAN";
+ elements = (Boolean[]) value;
+ } else if (Byte[].class.equals(value.getClass())) {
+ sqlType = "TINYINT";
+ elements = (Byte[]) value;
+ } else if (Short[].class.equals(value.getClass())) {
+ sqlType = "SMALLINT";
+ elements = (Short[]) value;
+ } else if (Integer[].class.equals(value.getClass())) {
+ sqlType = "INTEGER";
+ elements = (Integer[]) value;
+ } else if (Long[].class.equals(value.getClass())) {
+ sqlType = "BIGINT";
+ elements = (Long[]) value;
+ } else if (Float[].class.equals(value.getClass())) {
+ sqlType = "REAL";
+ elements = (Float[]) value;
+ } else if (Double[].class.equals(value.getClass())) {
+ sqlType = "DOUBLE";
+ elements = (Double[]) value;
+ }
+ if (sqlType == null) {
+ throw new IllegalArgumentException(
+ "array inject error, not supported data type: " + value.getClass());
+ }
+ return connection.createArrayOf(sqlType, elements);
+ }
+
+ private static List<Object[]> generateTestDataSet() {
+ List<Object[]> rows = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ Object[] row = new Object[] {
+ (long) i,
+ Collections.singletonMap("key", Integer.parseInt("1")),
+ new String[] {"string"},
+ new Short[] {Short.parseShort("1")},
+ new Integer[] {Integer.parseInt("1")},
+ new Long[] {Long.parseLong("1")},
+ new Float[] {Float.parseFloat("1.1")},
+ new Double[] {Double.parseDouble("1.1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11L, 1),
+ LocalDate.now(),
+ LocalDateTime.now(),
+ i,
+ "string",
+ new Integer[] {Integer.parseInt("1")},
+ new Double[] {Double.parseDouble("1.1")},
+ new String[] {"1"},
+ "170141183460469231731687303715884105727",
+ "340282366920938463463374607431768211455",
+ "57896044618658097711785492504343953926634992332820282019728792003956564819967",
+ "115792089237316195423570985008687907853269984665640564039457584007913129639935",
+ new double[] {1, 2},
+ new double[][] {{2, 3}, {4, 5}}
+ };
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ private void initConnection()
+ throws SQLException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ final Properties info = new Properties();
+ info.put("user", this.clickHouseContainer.getUsername());
+ info.put("password", this.clickHouseContainer.getPassword());
+ this.connection =
+ ((Driver) Class.forName(DRIVER_CLASS).newInstance())
+ .connect(this.clickHouseContainer.getJdbcUrl(), info);
+ }
+
+ private void initializeClickhouseTable() {
+ try {
+ Statement statement = this.connection.createStatement();
+ statement.execute(CONFIG.getString(SOURCE_TABLE));
+ statement.execute(CONFIG.getString(SINK_TABLE));
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Clickhouse table failed!", e);
+ }
+ }
+
+ private static Config getInitClickhouseConfig() {
+ File file = ContainerUtil.getResourcesFile(INIT_CLICKHOUSE_PATH);
+ Config config = ConfigFactory.parseFile(file);
+ assert config.hasPath(SOURCE_TABLE)
+ && config.hasPath(SINK_TABLE)
+ && config.hasPath(INSERT_SQL)
+ && config.hasPath(COMPARE_SQL);
+ return config;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ if (this.clickHouseContainer != null) {
+ this.clickHouseContainer.stop();
+ }
+
+ }
+}
diff --git a/groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_init.conf b/groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_init.conf
new file mode 100644
index 0000000..78f2daa
--- /dev/null
+++ b/groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_init.conf
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """
+set allow_experimental_geo_types = 1;
+create table if not exists `default`.source_table(
+ `id` Int64,
+ `c_map` Map(String, Int32),
+ `c_array_string` Array(String),
+ `c_array_short` Array(Int16),
+ `c_array_int` Array(Int32),
+ `c_array_long` Array(Int64),
+ `c_array_float` Array(Float32),
+ `c_array_double` Array(Float64),
+ `c_string` String,
+ `c_boolean` Boolean,
+ `c_int8` Int8,
+ `c_int16` Int16,
+ `c_int32` Int32,
+ `c_int64` Int64,
+ `c_float32` Float32,
+ `c_float64` Float64,
+ `c_decimal` Decimal(9,4),
+ `c_date` Date,
+ `c_datetime` DateTime64,
+ `c_nullable` Nullable(Int32),
+ `c_lowcardinality` LowCardinality(String),
+ `c_nested` Nested
+ (
+ `int` UInt32,
+ `double` Float64,
+ `string` String
+ ),
+ `c_int128` Int128,
+ `c_uint128` UInt128,
+ `c_int256` Int256,
+ `c_uint256` UInt256,
+ `c_point` Point,
+ `c_ring` Ring
+)engine=Memory;
+"""
+
+sink_table = """
+create table if not exists `default`.sink_table(
+ `id` Int64,
+ `c_map` Map(String, Int32),
+ `c_array_string` Array(String),
+ `c_array_short` Array(Int16),
+ `c_array_int` Array(Int32),
+ `c_array_long` Array(Int64),
+ `c_array_float` Array(Float32),
+ `c_array_double` Array(Float64),
+ `c_string` String,
+ `c_boolean` Boolean,
+ `c_int8` Int8,
+ `c_int16` Int16,
+ `c_int32` Int32,
+ `c_int64` Int64,
+ `c_float32` Float32,
+ `c_float64` Float64,
+ `c_decimal` Decimal(9,4),
+ `c_date` Date,
+ `c_datetime` DateTime64,
+ `c_nullable` Nullable(Int32),
+ `c_lowcardinality` LowCardinality(String),
+ `c_nested` Nested
+ (
+ `int` UInt32,
+ `double` Float64,
+ `string` String
+ ),
+ `c_int128` Int128,
+ `c_uint128` UInt128,
+ `c_int256` Int256,
+ `c_uint256` UInt256,
+ `c_point` Point,
+ `c_ring` Ring
+)engine=Memory;
+"""
+
+insert_sql = """
+insert into `default`.source_table
+(
+ `id`,
+ `c_map`,
+ `c_array_string`,
+ `c_array_short`,
+ `c_array_int`,
+ `c_array_long`,
+ `c_array_float`,
+ `c_array_double`,
+ `c_string`,
+ `c_boolean`,
+ `c_int8`,
+ `c_int16`,
+ `c_int32`,
+ `c_int64`,
+ `c_float32`,
+ `c_float64`,
+ `c_decimal`,
+ `c_date`,
+ `c_datetime`,
+ `c_nullable`,
+ `c_lowcardinality`,
+ `c_nested.int`,
+ `c_nested.double`,
+ `c_nested.string`,
+ `c_int128`,
+ `c_uint128`,
+ `c_int256`,
+ `c_uint256`,
+ `c_point`,
+ `c_ring`
+)
+values
+(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
+"""
+
+compare_sql = """
+select
+ %s
+ from (
+ select * from default.source_table
+union all
+ select * from default.sink_table
+ )
+group by %s
+having count(*) < 2
+""" \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaIT.java
index 56108c5..1c40e01 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaIT.java
@@ -18,7 +18,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -48,7 +47,7 @@ import static org.awaitility.Awaitility.await;
@DisabledOnContainer(
value = {TestContainerId.FLINK_1_17},
disabledReason = "Override TestSuiteBase @DisabledOnContainer")
-public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
+public class KafkaIT extends TestSuiteBase implements TestResource {
private KafkaContainer kafkaContainer;
@@ -244,8 +243,6 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
});
}
-
-
private void generateTestData(String topic, int start, int end) {
StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string");
JsonSerializer serializer = new JsonSerializer(dataType);
diff --git a/pom.xml b/pom.xml
index 5e3e128..69ea712 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,30 @@
</properties>
+ <repositories>
+ <repository>
+ <id>nexus3</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.153:8081/repository/public/</url>
+ </repository>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+ </repositories>
+
+ <distributionManagement>
+ <repository>
+ <uniqueVersion>true</uniqueVersion>
+ <id>platform-releases</id>
+ <url>http://192.168.40.153:8081/repository/platform-release/</url>
+ </repository>
+ <snapshotRepository>
+ <id>platform-snapshots</id>
+ <url>http://192.168.40.153:8081/repository/platform-snapshot/</url>
+ </snapshotRepository>
+ </distributionManagement>
+
<dependencyManagement>
<dependencies>
<dependency>
@@ -967,28 +991,6 @@
</build>
- <repositories>
- <repository>
- <id>nexus3</id>
- <name>Team Nexus Repository</name>
- <url>http://192.168.40.153:8081/repository/public/</url>
- </repository>
- <repository>
- <id>cloudera</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
- </repository>
- </repositories>
- <distributionManagement>
- <repository>
- <uniqueVersion>true</uniqueVersion>
- <id>platform-releases</id>
- <url>http://192.168.40.153:8081/repository/platform-release/</url>
- </repository>
- <snapshotRepository>
- <id>platform-snapshots</id>
- <url>http://192.168.40.153:8081/repository/platform-snapshot/</url>
- </snapshotRepository>
- </distributionManagement>
</project>