package com.zdjizhi.etl.connection; import com.alibaba.fastjson.util.TypeUtils; import com.zdjizhi.utils.json.TypeUtil; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; public class ConnTimeMapFunction implements MapFunction, Map> { @Override public Map map(Map value) throws Exception { value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time"))); value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time"))); value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts"))); value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts"))); value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes"))); value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes"))); return value; } }