package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.etl.connection.ConnLogService; import com.zdjizhi.etl.dns.DnsLogService; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import static com.zdjizhi.common.FlowWriteConfig.BUFFER_TIMEOUT; import static com.zdjizhi.common.FlowWriteConfig.LOG_TYPE; public class LogFlowWriteTopology { private static final Log logger = LogFactory.get(); public static void main(String[] args) { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //两个输出之间的最大时间 (单位milliseconds) env.setBufferTimeout(BUFFER_TIMEOUT); //1 connection,2 dns if (LOG_TYPE == 1) { ConnLogService.connLogStream(env); } else if (LOG_TYPE == 2) { DnsLogService.dnsLogStream(env); } env.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is : {}", e); } } }