summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchaochaoc <[email protected]>2024-08-22 11:27:01 +0800
committerchaochaoc <[email protected]>2024-08-22 11:27:01 +0800
commitafffd7e4894826c717f0a3e3b2bfb0c202895b5a (patch)
tree3fc7b03c55da8b717aa0dc1e7f27689826162a98 /src
feat: update versionfeature/easy-stream-1.4
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/log4j2.properties32
-rw-r--r--src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java33
-rw-r--r--src/test/resources/data/order-records.csv18
-rw-r--r--src/test/resources/data/user-records.csv4
-rw-r--r--src/test/resources/jobs/job1.yml50
-rw-r--r--src/test/resources/tools/nc.exebin0 -> 45272 bytes
6 files changed, 137 insertions, 0 deletions
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..d2455b2
--- /dev/null
+++ b/src/main/resources/log4j2.properties
@@ -0,0 +1,32 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = ERROR
+rootLogger.appenderRefs = console
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-20c %x - %m%n
+
+logger.easy.name = com.geedgenetworks.flink.easy
+logger.easy.level = DEBUG
+logger.easy.appenderRefs = console
+logger.easy.appenderRef.console.ref = ConsoleAppender
+logger.easy.additivity = false
diff --git a/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java
new file mode 100644
index 0000000..deb7693
--- /dev/null
+++ b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.flink.easy.application;
+
+import com.geedgenetworks.flink.easy.core.Runners;
+import org.junit.jupiter.api.Test;
+
+public class ApplicationTest {
+
+ static {
+// System.setProperty("easy.execute.mode", "validate");
+ System.setProperty("flink.rest.bind-port", "8081");
+// System.setProperty("flink.rest.flamegraph.enabled", "true");
+ System.setProperty("flink.heartbeat.timeout", "1800000");
+ }
+
+ public static String discoverConfiguration(final String name) throws Exception {
+ var path = String.format("/jobs/%s.yml", name);
+ var resource = ApplicationTest.class.getResource(path);
+ if (resource == null) {
+ // maven
+ resource = ApplicationTest.class.getResource(String.format("../classes/%s", path));
+ }
+ if (resource == null) {
+ throw new IllegalArgumentException(
+ String.format("Not found job '%s' in path [%s].", name, path));
+ }
+ return resource.getPath();
+ }
+
+ @Test
+ public void testJob() throws Exception {
+ Runners.run(discoverConfiguration("job1"));
+ }
+}
diff --git a/src/test/resources/data/order-records.csv b/src/test/resources/data/order-records.csv
new file mode 100644
index 0000000..76e2cf2
--- /dev/null
+++ b/src/test/resources/data/order-records.csv
@@ -0,0 +1,18 @@
+1,Tom,apple,fruit,15.50,2020-01-01 09:10:05.451
+2,Tom,banana,fruit,12.50,2020-01-01 09:11:02.123
+3,Tom,carrot,vegetable,8.75,2020-01-01 09:34:23.498
+4,Tom,apple,fruit,15.50,2020-01-01 09:45:10.174
+5,John,chocolate,dessert,2400.00,2020-01-01 09:52:37.836
+6,Kevin,cake,dessert,129.00,2020-01-01 10:02:03.537
+7,Kevin,chocolate,dessert,99.00,2020-01-01 10:14:29.381
+8,Kevin,banana,fruit,19.50,2020-01-01 10:22:45.984
+9,Kevin,apple,fruit,15.50,2020-01-01 10:25:05.423
+10,Kevin,strawberry,fruit,8.75,2020-01-01 10:34:38.223
+15,Kevin,apple,fruit,25.50,2020-01-01 10:42:41.436
+11,Susan,chocolate,dessert,49.00,2020-01-01 10:47:31.250
+12,Susan,biscuit,dessert,88.00,2020-01-01 10:52:20.327
+13,Susan,banana,fruit,19.50,2020-01-01 10:52:25.285
+14,Susan,apple,fruit,15.50,2020-01-01 10:52:26.621
+18,Susan,apple,fruit,25.50,2020-01-01 10:52:33.001
+16,Susan,tomato,vegetable,25.50,2020-01-01 11:22:49.158
+17,Susan,beer,drink,39.90,2020-01-01 11:34:04.547 \ No newline at end of file
diff --git a/src/test/resources/data/user-records.csv b/src/test/resources/data/user-records.csv
new file mode 100644
index 0000000..c18de2b
--- /dev/null
+++ b/src/test/resources/data/user-records.csv
@@ -0,0 +1,4 @@
+Tom,true,1990-11-12
+John,false,1996-05-01
+Kevin,true,1988-02-02
+Susan,false,1970-04-13 \ No newline at end of file
diff --git a/src/test/resources/jobs/job1.yml b/src/test/resources/jobs/job1.yml
new file mode 100644
index 0000000..b73b376
--- /dev/null
+++ b/src/test/resources/jobs/job1.yml
@@ -0,0 +1,50 @@
+job:
+ name: A Stream Example
+ parallelism: 1
+ active-pipeline:
+# - stream1
+ - rule1-sink
+
+source:
+ - name: order-records
+ type: socket
+ option:
+ hostname: localhost
+ port: 9999
+ format: csv
+ schema:
+ - name: id # 订单号
+ data-type: BIGINT NOT NULL
+ - name: username # 用户名
+ data-type: STRING NOT NULL
+ - name: product_name # 商品名称
+ data-type: STRING NOT NULL
+ - name: product_type # 商品类型
+ data-type: STRING NOT NULL
+ - name: amount # 金额
+ data-type: DECIMAL(10, 2) NOT NULL
+ - name: formatted_timestamp
+ data-type: STRING NOT NULL
+ - name: event_timestamp
+ for: TO_TIMESTAMP_LTZ(UNIX_TIMESTAMP(formatted_timestamp, 'yyyy-MM-dd HH:mm:ss.SSS'), 3)
+ watermark: event_timestamp - INTERVAL '5' SECOND
+
+sink:
+ - name: rule1-sink
+ based-on: stream1.rule1
+ type: console
+ format: json
+
+pipeline:
+ - name: stream1
+ category: MULTI-RULE
+ based-on: order-records
+ rule:
+ - name: rule1 # 用于查找 在 1 分钟内购买了 3 次及以上,或金额总计大于 1000 的 用户
+ type: AGGREGATE
+# event-driven: true
+ aggregate: COUNT(1) as cnt, SUM(amount) as total_amount
+ group-by: username
+ having: cnt >= 3 || total_amount > 1000
+ with-in: 1 minute
+ slide-step: 10 second \ No newline at end of file
diff --git a/src/test/resources/tools/nc.exe b/src/test/resources/tools/nc.exe
new file mode 100644
index 0000000..79bc027
--- /dev/null
+++ b/src/test/resources/tools/nc.exe
Binary files differ