summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-02-02 18:37:59 +0800
committerwangkuan <[email protected]>2024-02-02 18:37:59 +0800
commiteb2a58daade819453bcb43880db3c6346ec27c73 (patch)
tree2961c5abb3e327869113e09e990c5b09b7289f0a /groot-bootstrap
parent05b53e48d59f6bf1e0cd7c22f9a725bd4c89d857 (diff)
[fix][core]修改包名为小写,调整测试用的Collect包位置
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java4
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java19
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java47
3 files changed, 68 insertions, 2 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index 31e53f9..721b733 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -5,12 +5,12 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.ConfigProvider;
import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.core.connector.collect.CollectSink;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigObject;
import com.typesafe.config.ConfigUtil;
@@ -84,7 +84,7 @@ public class SimpleJobTest {
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list");
- Assert.assertEquals("6167", asn_list.get(1));
+ Assert.assertEquals("6167", asn_list.get(0));
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
new file mode 100644
index 0000000..dfcb459
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.bootstrap.main.simple.collect;
+
+import com.geedgenetworks.common.Event;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class CollectSink implements SinkFunction<Event> {
+
+ // must be static
+ public static final List<Event> values = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void invoke(Event value, Context context) throws Exception {
+ values.add(value);
+ }
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
new file mode 100644
index 0000000..32a0acd
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
@@ -0,0 +1,47 @@
+package com.geedgenetworks.bootstrap.main.simple.collect;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * 用于测试的sink:把元素输出到标准输出或输出日志,输出类型:1(stdout),2(logInfo),3(logWarn)
+ */
+public class CollectTableFactory implements SinkTableFactory {
+ public static final String IDENTIFIER = "collect";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+
+ return new SinkProvider(){
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
+ return dataStream.addSink(new CollectSink());
+ }
+ };
+ }
+
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ return options;
+ }
+
+}