summaryrefslogtreecommitdiff
path: root/ip-learning-spark/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'ip-learning-spark/src/main')
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java103
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java85
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java116
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java67
-rw-r--r--ip-learning-spark/src/main/resources/application.properties25
-rw-r--r--ip-learning-spark/src/main/resources/log4j.properties25
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala5
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala11
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala5
14 files changed, 467 insertions, 0 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
new file mode 100644
index 0000000..af47dcf
--- /dev/null
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -0,0 +1,103 @@
+package cn.ac.iie.dao;
+
+import cn.ac.iie.config.ApplicationConfig;
+import cn.ac.iie.service.read.ReadHistoryArangoData;
+import cn.ac.iie.utils.ArangoDBConnect;
+import cn.ac.iie.utils.ExecutorThreadPool;
+import com.arangodb.ArangoCursor;
+import com.arangodb.entity.BaseDocument;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * 获取arangoDB历史数据
+ *
+ * @author wlh
+ */
+public class BaseArangoData {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
+
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>();
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>();
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>();
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
+ static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
+
+ private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
+
+ private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
+
+ <T extends BaseDocument> void readHistoryData(String table,
+ ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap,
+ Class<T> type) {
+ try {
+ LOG.info("开始更新" + table);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
+ historyMap.put(i, new ConcurrentHashMap<>());
+ }
+ CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
+ long[] timeRange = getTimeRange(table);
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
+ String sql = getQuerySql(timeRange, i, table);
+ ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
+ threadPool.executor(readHistoryArangoData);
+ }
+ countDownLatch.await();
+ long last = System.currentTimeMillis();
+ LOG.info("读取" + table + " arangoDB 共耗时:" + (last - start));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private long[] getTimeRange(String table) {
+ long minTime = 0L;
+ long maxTime = 0L;
+ long startTime = System.currentTimeMillis();
+ String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}";
+ switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE) {
+ case 0:
+ ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
+ try {
+ if (timeDoc != null) {
+ while (timeDoc.hasNext()) {
+ BaseDocument doc = timeDoc.next();
+ maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER;
+ minTime = Long.parseLong(doc.getAttribute("min_time").toString());
+ }
+ } else {
+ LOG.warn("获取ArangoDb时间范围为空");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ break;
+ case 1:
+ maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME;
+ minTime = ApplicationConfig.READ_ARANGO_MIN_TIME;
+ break;
+ default:
+ }
+ long lastTime = System.currentTimeMillis();
+ LOG.info(sql + "\n查询最大最小时间用时:" + (lastTime - startTime));
+ return new long[]{minTime, maxTime};
+
+ }
+
+ private String getQuerySql(long[] timeRange, int threadNumber, String table) {
+ long minTime = timeRange[0];
+ long maxTime = timeRange[1];
+ long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
+ long maxThreadTime = minTime + (threadNumber + 1) * diffTime;
+ long minThreadTime = minTime + threadNumber * diffTime;
+ return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
+ }
+
+}
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
new file mode 100644
index 0000000..0b4eda5
--- /dev/null
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
@@ -0,0 +1,85 @@
+package cn.ac.iie.service.read;
+
+import cn.ac.iie.config.ApplicationConfig;
+import cn.ac.iie.utils.ArangoDBConnect;
+import com.arangodb.ArangoCursor;
+import com.arangodb.entity.BaseDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static cn.ac.iie.service.read.ReadClickhouseData.RECENT_COUNT_HOUR;
+
+/**
+ * @author wlh
+ * 多线程全量读取arangoDb历史数据,封装到map
+ */
+public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
+ private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class);
+
+ private ArangoDBConnect arangoConnect;
+ private String query;
+ private ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map;
+ private Class<T> type;
+ private String table;
+ private CountDownLatch countDownLatch;
+
+ public ReadHistoryArangoData(ArangoDBConnect arangoConnect,
+ String query,
+ ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,
+ Class<T> type,
+ String table,
+ CountDownLatch countDownLatch) {
+ this.arangoConnect = arangoConnect;
+ this.query = query;
+ this.map = map;
+ this.type = type;
+ this.table = table;
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long s = System.currentTimeMillis();
+ ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
+ if (docs != null) {
+ List<T> baseDocuments = docs.asListRemaining();
+ int i = 0;
+ for (T doc : baseDocuments) {
+ String key = doc.getKey();
+ int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
+ ConcurrentHashMap<String, T> tmpMap = map.get(hashCode);
+ tmpMap.put(key, doc);
+ i++;
+ }
+ long l = System.currentTimeMillis();
+ LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }finally {
+ countDownLatch.countDown();
+ LOG.info("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount());
+ }
+ }
+
+ private void updateProtocolDocument(T doc) {
+ if (doc.getProperties().containsKey("PROTOCOL_TYPE")) {
+ for (String protocol : ReadClickhouseData.PROTOCOL_SET) {
+ String protocolRecent = protocol + "_CNT_RECENT";
+ ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent);
+ Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]);
+ Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR];
+ System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1);
+ cntRecentsDst[0] = 0L;
+ doc.addAttribute(protocolRecent, cntRecentsDst);
+ }
+ }
+ }
+
+}
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
new file mode 100644
index 0000000..fc62f08
--- /dev/null
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
@@ -0,0 +1,116 @@
+package cn.ac.iie.utils;
+
+import cn.ac.iie.config.ApplicationConfig;
+import com.arangodb.ArangoCollection;
+import com.arangodb.ArangoCursor;
+import com.arangodb.ArangoDB;
+import com.arangodb.ArangoDatabase;
+import com.arangodb.entity.DocumentCreateEntity;
+import com.arangodb.entity.ErrorEntity;
+import com.arangodb.entity.MultiDocumentEntity;
+import com.arangodb.model.AqlQueryOptions;
+import com.arangodb.model.DocumentCreateOptions;
+import com.arangodb.util.MapBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+public class ArangoDBConnect {
+ private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class);
+ private static ArangoDB arangoDB = null;
+ private static ArangoDBConnect conn = null;
+ static {
+ getArangoDatabase();
+ }
+
+ private static void getArangoDatabase(){
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER)
+ .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT)
+ .user(ApplicationConfig.ARANGODB_USER)
+ .password(ApplicationConfig.ARANGODB_PASSWORD)
+ .build();
+ }
+
+ public static synchronized ArangoDBConnect getInstance(){
+ if (null == conn){
+ conn = new ArangoDBConnect();
+ }
+ return conn;
+ }
+
+ private ArangoDatabase getDatabase(){
+ return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME);
+ }
+
+ public void clean(){
+ try {
+ if (arangoDB != null){
+ arangoDB.shutdown();
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+
+ public <T> ArangoCursor<T> executorQuery(String query,Class<T> type){
+ ArangoDatabase database = getDatabase();
+ Map<String, Object> bindVars = new MapBuilder().get();
+ AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL);
+ try {
+ return database.query(query, bindVars, options, type);
+ }catch (Exception e){
+ e.printStackTrace();
+ return null;
+ }finally {
+ bindVars.clear();
+ }
+ }
+
+ @Deprecated
+ public <T> void insertAndUpdate(ArrayList<T> docInsert,ArrayList<T> docUpdate,String collectionName){
+ ArangoDatabase database = getDatabase();
+ try {
+ ArangoCollection collection = database.collection(collectionName);
+ if (!docInsert.isEmpty()){
+ collection.importDocuments(docInsert);
+ }
+ if (!docUpdate.isEmpty()){
+ collection.replaceDocuments(docUpdate);
+ }
+ }catch (Exception e){
+ System.out.println("更新失败");
+ e.printStackTrace();
+ }finally {
+ docInsert.clear();
+ docInsert.clear();
+ }
+ }
+
+ public <T> void overwrite(ArrayList<T> docOverwrite,String collectionName){
+ ArangoDatabase database = getDatabase();
+ try {
+ ArangoCollection collection = database.collection(collectionName);
+ if (!docOverwrite.isEmpty()){
+ DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions();
+ documentCreateOptions.overwrite(true);
+ documentCreateOptions.silent(true);
+ MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
+ Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
+ for (ErrorEntity errorEntity:errors){
+ LOG.debug("写入arangoDB异常:"+errorEntity.getErrorMessage());
+ }
+ }
+ }catch (Exception e){
+ System.out.println("更新失败:"+e.toString());
+ }finally {
+ docOverwrite.clear();
+ }
+ }
+
+
+
+}
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java
new file mode 100644
index 0000000..e3142ae
--- /dev/null
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java
@@ -0,0 +1,67 @@
+package cn.ac.iie.utils;
+
+import cn.ac.iie.config.ApplicationConfig;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.concurrent.*;
+
+/**
+ * 线程池管理
+ * @author wlh
+ */
+public class ExecutorThreadPool {
+ private static ExecutorService pool = null ;
+ private static ExecutorThreadPool poolExecutor = null;
+
+ static {
+ getThreadPool();
+ }
+
+ private static void getThreadPool(){
+ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("iplearning-application-pool-%d").build();
+
+ //Common Thread Pool
+ pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER*2,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+
+// pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER);
+ }
+
+ public static ExecutorThreadPool getInstance(){
+ if (null == poolExecutor){
+ poolExecutor = new ExecutorThreadPool();
+ }
+ return poolExecutor;
+ }
+
+ public void executor(Runnable command){
+ pool.execute(command);
+ }
+
+ @Deprecated
+ public void awaitThreadTask(){
+ try {
+ while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) {
+ System.out.println("线程池没有关闭");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void shutdown(){
+ pool.shutdown();
+ }
+
+ @Deprecated
+ public static Long getThreadNumber(){
+ String name = Thread.currentThread().getName();
+ String[] split = name.split("-");
+ return Long.parseLong(split[3]);
+ }
+
+
+
+}
diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties
new file mode 100644
index 0000000..87b0bbb
--- /dev/null
+++ b/ip-learning-spark/src/main/resources/application.properties
@@ -0,0 +1,25 @@
+#spark任务配置
+spark.sql.shuffle.partitions=144
+spark.sql.read.fetchsize=10000
+spark.executor.memory=120g
+spark.app.name=test
+spark.network.timeout=300s
+repartitionNumber=36
+spark.serializer=org.apache.spark.serializer.KryoSerializer
+master=local[*]
+#spark读取clickhouse配置
+numPartitions=144
+maxPoolSize=40
+minTime=1571245199
+maxTime=1571284799
+clickhouse.socket.timeout=300000
+#arangoDB配置
+arangoDB.host=192.168.40.127
+arangoDB.port=8529
+arangoDB.user=root
+arangoDB.password=111111
+arangoDB.DB.name=insert_iplearn_index
+arangoDB.batch=100000
+arangoDB.ttl=3600
+
+thread.pool.number=10
diff --git a/ip-learning-spark/src/main/resources/log4j.properties b/ip-learning-spark/src/main/resources/log4j.properties
new file mode 100644
index 0000000..ee350e5
--- /dev/null
+++ b/ip-learning-spark/src/main/resources/log4j.properties
@@ -0,0 +1,25 @@
+######################### logger ##############################
+log4j.logger.org.apache.http=OFF
+log4j.logger.org.apache.http.wire=OFF
+
+#Log4j
+log4j.rootLogger=info,console,file
+# ����̨��־����
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# �ļ���־����
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#·���������·����������ز��������Ӧ��Ŀ��
+#log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log
+#log4j.appender.file.file=/home/ceiec/iplearning/testLog/ip-learning-application.log
+log4j.appender.file.file=./logs/ip-learning-application.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
new file mode 100644
index 0000000..9e72fac
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.config
+
+object ApplicationConfig {
+
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
new file mode 100644
index 0000000..3a19be9
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.dao
+
+object BaseClickhouseData {
+
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala
new file mode 100644
index 0000000..17385f0
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.main
+
+object IpLearningApplication {
+
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
new file mode 100644
index 0000000..c7939fe
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.service.transform
+
+object MergeDataFrame {
+
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
new file mode 100644
index 0000000..64bed4d
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.service.update
+
+object UpdateDocHandler {
+
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
new file mode 100644
index 0000000..c25c31e
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.service.update
+
+object UpdateDocument {
+
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala
new file mode 100644
index 0000000..a3c26ae
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala
@@ -0,0 +1,11 @@
+package cn.ac.iie.service.partition
+
+import org.apache.spark.Partitioner
+
+class CustomPartitioner(numPartition: Int) extends Partitioner{
+ override def numPartitions: Int = numPartition
+
+ override def getPartition(key: Any): Int = {
+ Math.abs(key.hashCode()) % numPartition
+ }
+}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala
new file mode 100644
index 0000000..ce0f417
--- /dev/null
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala
@@ -0,0 +1,5 @@
+package cn.ac.iie.utils
+
+object SparkSessionUtil {
+
+}