diff options
| author | chaochaoc <[email protected]> | 2024-08-22 11:27:01 +0800 |
|---|---|---|
| committer | chaochaoc <[email protected]> | 2024-08-22 11:27:01 +0800 |
| commit | afffd7e4894826c717f0a3e3b2bfb0c202895b5a (patch) | |
| tree | 3fc7b03c55da8b717aa0dc1e7f27689826162a98 /src | |
feat: update versionfeature/easy-stream-1.4
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/resources/log4j2.properties | 32 | ||||
| -rw-r--r-- | src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java | 33 | ||||
| -rw-r--r-- | src/test/resources/data/order-records.csv | 18 | ||||
| -rw-r--r-- | src/test/resources/data/user-records.csv | 4 | ||||
| -rw-r--r-- | src/test/resources/jobs/job1.yml | 50 | ||||
| -rw-r--r-- | src/test/resources/tools/nc.exe | bin | 0 -> 45272 bytes |
6 files changed, 137 insertions, 0 deletions
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..d2455b2 --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,32 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = ERROR +rootLogger.appenderRefs = console +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-20c %x - %m%n + +logger.easy.name = com.geedgenetworks.flink.easy +logger.easy.level = DEBUG +logger.easy.appenderRefs = console +logger.easy.appenderRef.console.ref = ConsoleAppender +logger.easy.additivity = false diff --git a/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java new file mode 100644 index 0000000..deb7693 --- /dev/null +++ b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java @@ -0,0 +1,33 @@ +package com.geedgenetworks.flink.easy.application; + +import com.geedgenetworks.flink.easy.core.Runners; +import org.junit.jupiter.api.Test; + +public class ApplicationTest { + + static { +// System.setProperty("easy.execute.mode", "validate"); + System.setProperty("flink.rest.bind-port", "8081"); +// System.setProperty("flink.rest.flamegraph.enabled", "true"); + System.setProperty("flink.heartbeat.timeout", "1800000"); + } + + public static String discoverConfiguration(final String name) throws Exception { + var path = String.format("/jobs/%s.yml", name); + var resource = ApplicationTest.class.getResource(path); + if (resource == null) { + // maven + resource = ApplicationTest.class.getResource(String.format("../classes/%s", path)); + } + if (resource == null) { + throw new IllegalArgumentException( + String.format("Not found job '%s' in path [%s].", name, path)); + } + return resource.getPath(); + } + + @Test + public void testJob() throws Exception { + Runners.run(discoverConfiguration("job1")); + } +} diff --git a/src/test/resources/data/order-records.csv b/src/test/resources/data/order-records.csv new file mode 100644 index 0000000..76e2cf2 --- /dev/null +++ b/src/test/resources/data/order-records.csv @@ -0,0 +1,18 @@ +1,Tom,apple,fruit,15.50,2020-01-01 09:10:05.451 +2,Tom,banana,fruit,12.50,2020-01-01 09:11:02.123 +3,Tom,carrot,vegetable,8.75,2020-01-01 09:34:23.498 +4,Tom,apple,fruit,15.50,2020-01-01 09:45:10.174 +5,John,chocolate,dessert,2400.00,2020-01-01 09:52:37.836 +6,Kevin,cake,dessert,129.00,2020-01-01 10:02:03.537 +7,Kevin,chocolate,dessert,99.00,2020-01-01 10:14:29.381 +8,Kevin,banana,fruit,19.50,2020-01-01 10:22:45.984 +9,Kevin,apple,fruit,15.50,2020-01-01 10:25:05.423 +10,Kevin,strawberry,fruit,8.75,2020-01-01 10:34:38.223 +15,Kevin,apple,fruit,25.50,2020-01-01 10:42:41.436 +11,Susan,chocolate,dessert,49.00,2020-01-01 10:47:31.250 +12,Susan,biscuit,dessert,88.00,2020-01-01 10:52:20.327 +13,Susan,banana,fruit,19.50,2020-01-01 10:52:25.285 +14,Susan,apple,fruit,15.50,2020-01-01 10:52:26.621 +18,Susan,apple,fruit,25.50,2020-01-01 10:52:33.001 +16,Susan,tomato,vegetable,25.50,2020-01-01 11:22:49.158 +17,Susan,beer,drink,39.90,2020-01-01 11:34:04.547
\ No newline at end of file diff --git a/src/test/resources/data/user-records.csv b/src/test/resources/data/user-records.csv new file mode 100644 index 0000000..c18de2b --- /dev/null +++ b/src/test/resources/data/user-records.csv @@ -0,0 +1,4 @@ +Tom,true,1990-11-12 +John,false,1996-05-01 +Kevin,true,1988-02-02 +Susan,false,1970-04-13
\ No newline at end of file diff --git a/src/test/resources/jobs/job1.yml b/src/test/resources/jobs/job1.yml new file mode 100644 index 0000000..b73b376 --- /dev/null +++ b/src/test/resources/jobs/job1.yml @@ -0,0 +1,50 @@ +job: + name: A Stream Example + parallelism: 1 + active-pipeline: +# - stream1 + - rule1-sink + +source: + - name: order-records + type: socket + option: + hostname: localhost + port: 9999 + format: csv + schema: + - name: id # 订单号 + data-type: BIGINT NOT NULL + - name: username # 用户名 + data-type: STRING NOT NULL + - name: product_name # 商品名称 + data-type: STRING NOT NULL + - name: product_type # 商品类型 + data-type: STRING NOT NULL + - name: amount # 金额 + data-type: DECIMAL(10, 2) NOT NULL + - name: formatted_timestamp + data-type: STRING NOT NULL + - name: event_timestamp + for: TO_TIMESTAMP_LTZ(UNIX_TIMESTAMP(formatted_timestamp, 'yyyy-MM-dd HH:mm:ss.SSS'), 3) + watermark: event_timestamp - INTERVAL '5' SECOND + +sink: + - name: rule1-sink + based-on: stream1.rule1 + type: console + format: json + +pipeline: + - name: stream1 + category: MULTI-RULE + based-on: order-records + rule: + - name: rule1 # 用于查找 在 1 分钟内购买了 3 次及以上,或金额总计大于 1000 的 用户 + type: AGGREGATE +# event-driven: true + aggregate: COUNT(1) as cnt, SUM(amount) as total_amount + group-by: username + having: cnt >= 3 || total_amount > 1000 + with-in: 1 minute + slide-step: 10 second
\ No newline at end of file diff --git a/src/test/resources/tools/nc.exe b/src/test/resources/tools/nc.exe Binary files differnew file mode 100644 index 0000000..79bc027 --- /dev/null +++ b/src/test/resources/tools/nc.exe |
