package com.zdjizhi.etl; import cn.hutool.json.JSONUtil; import com.zdjizhi.utils.ck.CKSink; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.DataStream; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM; public class LogService { public static void getLogKafkaSink(DataStream> sourceStream, String sink) throws Exception { sourceStream.map(JSONUtil::toJsonStr) .setParallelism(SINK_PARALLELISM) .addSink(KafkaProducer.getKafkaProducer(sink)) .setParallelism(SINK_PARALLELISM) .name(sink) .setParallelism(SINK_PARALLELISM); } public static void getLogCKSink(DataStream> sourceStream, String sink) throws Exception { sourceStream.addSink(new CKSink(sink)) .setParallelism(SINK_PARALLELISM) .name(sink) .setParallelism(SINK_PARALLELISM); } }