package com.zdjizhi.utils.arangodb; import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.util.List; /** * @description: * @author: zhq * @create: 2022-07-07 **/ public class ArangoDBSink extends RichSinkFunction> { private static ArangoDBConnect arangoDBConnect; private String collection; @Override public void invoke(List baseEdgeDocuments, Context context) throws Exception { arangoDBConnect.overwrite(baseEdgeDocuments, getCollection()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); arangoDBConnect = ArangoDBConnect.getInstance(); } @Override public void close() throws Exception { super.close(); arangoDBConnect.clean(); } public ArangoDBSink(String collection) { this.collection = collection; } public String getCollection() { return collection; } public void setCollection(String collection) { this.collection = collection; } }