summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/test')
-rw-r--r--src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java33
-rw-r--r--src/test/resources/data/order-records.csv18
-rw-r--r--src/test/resources/data/user-records.csv4
-rw-r--r--src/test/resources/jobs/job1.yml50
-rw-r--r--src/test/resources/tools/nc.exebin0 -> 45272 bytes
5 files changed, 105 insertions, 0 deletions
diff --git a/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java
new file mode 100644
index 0000000..deb7693
--- /dev/null
+++ b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.flink.easy.application;
+
+import com.geedgenetworks.flink.easy.core.Runners;
+import org.junit.jupiter.api.Test;
+
+public class ApplicationTest {
+
+ static {
+// System.setProperty("easy.execute.mode", "validate");
+ System.setProperty("flink.rest.bind-port", "8081");
+// System.setProperty("flink.rest.flamegraph.enabled", "true");
+ System.setProperty("flink.heartbeat.timeout", "1800000");
+ }
+
+ public static String discoverConfiguration(final String name) throws Exception {
+ var path = String.format("/jobs/%s.yml", name);
+ var resource = ApplicationTest.class.getResource(path);
+ if (resource == null) {
+ // maven
+ resource = ApplicationTest.class.getResource(String.format("../classes/%s", path));
+ }
+ if (resource == null) {
+ throw new IllegalArgumentException(
+ String.format("Not found job '%s' in path [%s].", name, path));
+ }
+ return resource.getPath();
+ }
+
+ @Test
+ public void testJob() throws Exception {
+ Runners.run(discoverConfiguration("job1"));
+ }
+}
diff --git a/src/test/resources/data/order-records.csv b/src/test/resources/data/order-records.csv
new file mode 100644
index 0000000..76e2cf2
--- /dev/null
+++ b/src/test/resources/data/order-records.csv
@@ -0,0 +1,18 @@
+1,Tom,apple,fruit,15.50,2020-01-01 09:10:05.451
+2,Tom,banana,fruit,12.50,2020-01-01 09:11:02.123
+3,Tom,carrot,vegetable,8.75,2020-01-01 09:34:23.498
+4,Tom,apple,fruit,15.50,2020-01-01 09:45:10.174
+5,John,chocolate,dessert,2400.00,2020-01-01 09:52:37.836
+6,Kevin,cake,dessert,129.00,2020-01-01 10:02:03.537
+7,Kevin,chocolate,dessert,99.00,2020-01-01 10:14:29.381
+8,Kevin,banana,fruit,19.50,2020-01-01 10:22:45.984
+9,Kevin,apple,fruit,15.50,2020-01-01 10:25:05.423
+10,Kevin,strawberry,fruit,8.75,2020-01-01 10:34:38.223
+15,Kevin,apple,fruit,25.50,2020-01-01 10:42:41.436
+11,Susan,chocolate,dessert,49.00,2020-01-01 10:47:31.250
+12,Susan,biscuit,dessert,88.00,2020-01-01 10:52:20.327
+13,Susan,banana,fruit,19.50,2020-01-01 10:52:25.285
+14,Susan,apple,fruit,15.50,2020-01-01 10:52:26.621
+18,Susan,apple,fruit,25.50,2020-01-01 10:52:33.001
+16,Susan,tomato,vegetable,25.50,2020-01-01 11:22:49.158
+17,Susan,beer,drink,39.90,2020-01-01 11:34:04.547 \ No newline at end of file
diff --git a/src/test/resources/data/user-records.csv b/src/test/resources/data/user-records.csv
new file mode 100644
index 0000000..c18de2b
--- /dev/null
+++ b/src/test/resources/data/user-records.csv
@@ -0,0 +1,4 @@
+Tom,true,1990-11-12
+John,false,1996-05-01
+Kevin,true,1988-02-02
+Susan,false,1970-04-13 \ No newline at end of file
diff --git a/src/test/resources/jobs/job1.yml b/src/test/resources/jobs/job1.yml
new file mode 100644
index 0000000..b73b376
--- /dev/null
+++ b/src/test/resources/jobs/job1.yml
@@ -0,0 +1,50 @@
+job:
+ name: A Stream Example
+ parallelism: 1
+ active-pipeline:
+# - stream1
+ - rule1-sink
+
+source:
+ - name: order-records
+ type: socket
+ option:
+ hostname: localhost
+ port: 9999
+ format: csv
+ schema:
+ - name: id # 订单号
+ data-type: BIGINT NOT NULL
+ - name: username # 用户名
+ data-type: STRING NOT NULL
+ - name: product_name # 商品名称
+ data-type: STRING NOT NULL
+ - name: product_type # 商品类型
+ data-type: STRING NOT NULL
+ - name: amount # 金额
+ data-type: DECIMAL(10, 2) NOT NULL
+ - name: formatted_timestamp
+ data-type: STRING NOT NULL
+ - name: event_timestamp
+ for: TO_TIMESTAMP_LTZ(UNIX_TIMESTAMP(formatted_timestamp, 'yyyy-MM-dd HH:mm:ss.SSS'), 3)
+ watermark: event_timestamp - INTERVAL '5' SECOND
+
+sink:
+ - name: rule1-sink
+ based-on: stream1.rule1
+ type: console
+ format: json
+
+pipeline:
+ - name: stream1
+ category: MULTI-RULE
+ based-on: order-records
+ rule:
+ - name: rule1 # 用于查找 在 1 分钟内购买了 3 次及以上,或金额总计大于 1000 的 用户
+ type: AGGREGATE
+# event-driven: true
+ aggregate: COUNT(1) as cnt, SUM(amount) as total_amount
+ group-by: username
+ having: cnt >= 3 || total_amount > 1000
+ with-in: 1 minute
+ slide-step: 10 second \ No newline at end of file
diff --git a/src/test/resources/tools/nc.exe b/src/test/resources/tools/nc.exe
new file mode 100644
index 0000000..79bc027
--- /dev/null
+++ b/src/test/resources/tools/nc.exe
Binary files differ