diff options
| author | wangkuan <[email protected]> | 2024-02-02 18:37:59 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-02-02 18:37:59 +0800 |
| commit | eb2a58daade819453bcb43880db3c6346ec27c73 (patch) | |
| tree | 2961c5abb3e327869113e09e990c5b09b7289f0a /groot-bootstrap/src | |
| parent | 05b53e48d59f6bf1e0cd7c22f9a725bd4c89d857 (diff) | |
[fix][core]修改包名为小写,调整测试用的Collect包位置
Diffstat (limited to 'groot-bootstrap/src')
3 files changed, 68 insertions, 2 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 31e53f9..721b733 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -5,12 +5,12 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.ConfigProvider; import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.core.connector.collect.CollectSink; import com.typesafe.config.Config; import com.typesafe.config.ConfigObject; import com.typesafe.config.ConfigUtil; @@ -84,7 +84,7 @@ public class SimpleJobTest { Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); - Assert.assertEquals("6167", asn_list.get(1)); + Assert.assertEquals("6167", asn_list.get(0)); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java new file mode 100644 index 0000000..dfcb459 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.bootstrap.main.simple.collect; + +import com.geedgenetworks.common.Event; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class CollectSink implements SinkFunction<Event> { + + // must be static + public static final List<Event> values = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void invoke(Event value, Context context) throws Exception { + values.add(value); + } +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java new file mode 100644 index 0000000..32a0acd --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java @@ -0,0 +1,47 @@ +package com.geedgenetworks.bootstrap.main.simple.collect; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.sink.SinkProvider; +import com.geedgenetworks.core.factories.SinkTableFactory; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; + +import java.util.HashSet; +import java.util.Set; + +/** + * 用于测试的sink:把元素输出到标准输出或输出日志,输出类型:1(stdout),2(logInfo),3(logWarn) + */ +public class CollectTableFactory implements SinkTableFactory { + public static final String IDENTIFIER = "collect"; + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SinkProvider getSinkProvider(Context context) { + + return new SinkProvider(){ + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { + return dataStream.addSink(new CollectSink()); + } + }; + } + + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + return options; + } + +} |
