1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
package com.zdjizhi.utils.arangodb;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;
import static com.zdjizhi.common.FlowWriteConfig.ARANGODB_BATCH;
import static com.zdjizhi.common.FlowWriteConfig.SINK_ARANGODB_BATCH_DELAY_TIME;
public class AGSink extends RichSinkFunction<BaseEdgeDocument> {
private static final Log log = LogFactory.get();
private static final long serialVersionUID = 1L;
private static final Log logger = LogFactory.get();
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
private final CopyOnWriteArrayList<BaseEdgeDocument> ipWithDataList;
// 满足此时间条件写出数据
private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME;
// 插入的批次
private final int insertArangoBatchSize = ARANGODB_BATCH; // 开发测试用10条
private static ArangoDBConnect arangoDBConnect;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
// 数据表名
private String sink;
public AGSink(String sink) {
this.sink = sink;
this.ipWithDataList = new CopyOnWriteArrayList<>();
}
public String getSink() {
return sink;
}
/**
* Connects to the target database and initializes the prepared statement.
*/
@Override
public void open(Configuration parameters) throws Exception {
arangoDBConnect = ArangoDBConnect.getInstance();
if (insertArangoTimeInterval != 0 && insertArangoBatchSize != 1) {
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("arangodb-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (AGSink.this) {
if (!closed) {
try {
logger.debug("arangodb_flush.............");
flush(ipWithDataList);
} catch (Exception e) {
log.error(e);
}
}
}
},
insertArangoTimeInterval,
insertArangoTimeInterval,
TimeUnit.MILLISECONDS);
}
}
@Override
public final synchronized void invoke(BaseEdgeDocument row, Context context) throws IOException {
ipWithDataList.add(row);
if (ipWithDataList.size() >= this.insertArangoBatchSize) {
try {
flush(ipWithDataList);
} catch (Exception e) {
logger.error("ck sink invoke flush failed.", e);
}
}
}
// 插入数据
private synchronized void flush(List<BaseEdgeDocument> data) throws SQLException {
if (data.size() > 0) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.debug("开始写入arangodb数据 :{}", data.size());
arangoDBConnect.overwrite(data, sink);
stopWatch.stop();
log.debug("总共花费时间 {} ms", stopWatch.getTime());
log.debug("写入arangodb表{},数据 {}", sink, data.size());
ipWithDataList.clear();
}
}
/**
* Executes prepared statement and closes all resources of this instance.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
@Override
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
if (arangoDBConnect != null) {
try {
flush(ipWithDataList);
} catch (SQLException e) {
log.error("JDBC statement could not be closed: " + e.getMessage());
} finally {
try {
arangoDBConnect.clean();
} catch (Exception e) {
log.error("JDBC connection could not be closed: " + e.getMessage());
}
}
}
}
}
}
|