summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-09-12 09:02:39 +0000
committer王宽 <[email protected]>2024-09-12 09:02:39 +0000
commit1dd4e02841f91457b26fea152a3a23146ff60a21 (patch)
tree04052c11df4f806bac0c2ee40d1bddd55e6b2fc7
parent7d4b6d052d9584e38a1c0deec18408f74b07655e (diff)
parent1f9a8e44b70ece223a48b5daf45c2f3696a31d77 (diff)
Merge branch 'feature/cn-location' into 'develop'
Feature/cn location See merge request galaxy/platform/groot-stream!106
-rw-r--r--groot-core/pom.xml13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java81
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java90
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java69
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java65
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java20
8 files changed, 430 insertions, 4 deletions
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index e723fa5..e1377c4 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -14,6 +14,19 @@
<dependencies>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.uber</groupId>
+ <artifactId>h3</artifactId>
+ <version>4.1.1</version>
+ </dependency>
+
+ <dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>5.11.2</version>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java
new file mode 100644
index 0000000..d506461
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java
@@ -0,0 +1,81 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.BaseStationKnowledgeBaseHandler;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/8/19 15:21
+ */
+public class BaseStationLookup extends AbstractKnowledgeScalarFunction {
+
+ private String cellIdFieldName;
+
+ private String longitudeFieldName;
+
+ private String latitudeFieldName;
+
+ private BaseStationKnowledgeBaseHandler knowledgeBaseHandler;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ super.open(runtimeContext, udfContext);
+ if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+
+ int lookupFieldsSize = 1;
+ if (udfContext.getLookup_fields().size() != lookupFieldsSize) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain field: cell_id");
+ }
+
+ int outputFieldsSize = 2;
+ if (udfContext.getOutput_fields().size() != outputFieldsSize) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "output_fields must contain two fields: longitude and latitude");
+ }
+
+ List<String> lookupFields = udfContext.getLookup_fields();
+ cellIdFieldName = lookupFields.get(0);
+ List<String> outputFields = udfContext.getOutput_fields();
+ longitudeFieldName = outputFields.get(0);
+ latitudeFieldName = outputFields.get(1);
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ Map<String, Object> extractedFields = event.getExtractedFields();
+ if (extractedFields == null || extractedFields.get(cellIdFieldName) == null) {
+ return event;
+ }
+ BaseStationKnowledgeBaseHandler.BaseStationLocation lookup = knowledgeBaseHandler.lookup(extractedFields.get(cellIdFieldName).toString());
+ if (lookup != null) {
+ extractedFields.put(longitudeFieldName, lookup.getLongitude());
+ extractedFields.put(latitudeFieldName, lookup.getLatitude());
+ }
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "BASE_STATION_LOOKUP";
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ protected void registerKnowledges() {
+ knowledgeBaseHandler = BaseStationKnowledgeBaseHandler.getInstance();
+ KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0));
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java
new file mode 100644
index 0000000..2698772
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.ScalarFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.uber.h3core.H3Core;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/8/19 14:50
+ */
+public class H3CellLookup implements ScalarFunction {
+
+ private String longitudeFieldName;
+
+ private String latitudeFieldName;
+
+ private String outputFieldName;
+
+ private int res;
+
+ private H3Core h3;
+
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+
+ int lookupFieldsSize = 2;
+ if (udfContext.getLookup_fields().size() != lookupFieldsSize) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain two fields: longitude and latitude");
+ }
+
+ String resolution = "resolution";
+ int maxResolution = 15;
+ if (!udfContext.getParameters().containsKey(resolution) || !(udfContext.getParameters().get(resolution) instanceof Integer) || (int) udfContext.getParameters().get(resolution) < 0 || (int) udfContext.getParameters().get(resolution) > maxResolution) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contain key resolution and the value is between 0 and 15");
+ }
+
+ List<String> lookupFields = udfContext.getLookup_fields();
+ longitudeFieldName = lookupFields.get(0);
+ latitudeFieldName = lookupFields.get(1);
+ outputFieldName = udfContext.getOutput_fields().get(0);
+ res = (int) udfContext.getParameters().get("resolution");
+ try {
+ h3 = H3Core.newInstance();
+ } catch (IOException io) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.FILE_OPERATION_ERROR, "Failed to create H3Core instance");
+ }
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ Map<String, Object> extractedFields = event.getExtractedFields();
+ if (extractedFields == null || extractedFields.get(longitudeFieldName) == null || extractedFields.get(latitudeFieldName) == null) {
+ return event;
+ }
+ if (!(extractedFields.get(longitudeFieldName) instanceof Double) || !(extractedFields.get(latitudeFieldName) instanceof Double)) {
+ return event;
+ }
+ String cellAddress = h3.latLngToCellAddress((double) extractedFields.get(latitudeFieldName), (double) extractedFields.get(longitudeFieldName), res);
+ extractedFields.put(outputFieldName, cellAddress);
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "H3_CELL_LOOKUP";
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
index 6ac292c..f66cbe0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
@@ -31,6 +31,13 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
if ("http".equals(knowledgeBaseConfig.getFsType())) {
this.knowledgeMetedataCache = getMetadata(knowledgeBaseConfig.getFsType(), knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0));
}
+ if ("local".equals(knowledgeBaseConfig.getFsType())) {
+ knowledgeMetedataCache = new KnowLedgeBaseFileMeta();
+ knowledgeMetedataCache.setIsValid(1);
+ knowledgeMetedataCache.setFormat(knowledgeBaseConfig.getFiles().get(0).substring(knowledgeBaseConfig.getFiles().get(0).lastIndexOf(".") + 1));
+ knowledgeMetedataCache.setName(knowledgeBaseConfig.getFiles().get(0));
+ knowledgeMetedataCache.setPath(knowledgeBaseConfig.getFsPath() + knowledgeBaseConfig.getFiles().get(0));
+ }
return buildKnowledgeBase();
}
@@ -81,11 +88,8 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
case "aes":
result = AESUtil.decrypt(data, AES_KEY);
break;
- case "csv":
- result = data;
- break;
default:
- logger.error("unknown format: " + knowledgeMetedataCache.getFormat());
+ result = data;
}
} catch (Exception e) {
logger.error("decrypt error", e);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java
new file mode 100644
index 0000000..0eeae53
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java
@@ -0,0 +1,90 @@
+package com.geedgenetworks.core.udf.knowlegdebase.handler;
+
+import com.geedgenetworks.core.utils.cn.csv.HighCsvReader;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/8/26 16:27
+ */
+public class BaseStationKnowledgeBaseHandler extends AbstractSingleKnowledgeBaseHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(BaseStationKnowledgeBaseHandler.class);
+
+ private HashMap<String, BaseStationLocation> baseStationLocationHashMap = new HashMap<>();
+
+ private BaseStationKnowledgeBaseHandler() {
+ }
+
+ private static final class InstanceHolder {
+ private static final BaseStationKnowledgeBaseHandler instance = new BaseStationKnowledgeBaseHandler();
+ }
+
+ public static BaseStationKnowledgeBaseHandler getInstance() {
+ return BaseStationKnowledgeBaseHandler.InstanceHolder.instance;
+ }
+
+ @Override
+ public Boolean buildKnowledgeBase() {
+ try {
+ List<String> needColumns = new ArrayList<>();
+ needColumns.add("cell_id");
+ needColumns.add("longitude");
+ needColumns.add("latitude");
+ byte[] content = downloadFile();
+ HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns);
+ HashMap<String, BaseStationLocation> newBaseStationLocationHashMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F));
+ HighCsvReader.CsvIterator iterator = highCsvReader.getIterator();
+ while (iterator.hasNext()) {
+ Map<String, String> line = iterator.next();
+ try {
+ String cellId = line.get("cell_id");
+ BaseStationLocation baseStationLocation = new BaseStationLocation();
+ baseStationLocation.setLongitude(parseDouble(line.get("longitude")));
+ baseStationLocation.setLatitude(parseDouble(line.get("latitude")));
+ newBaseStationLocationHashMap.put(cellId, baseStationLocation);
+ } catch (Exception lineException) {
+ logger.error(this.getClass().getSimpleName() + " line: " + line.toString() + " parse error:" + lineException, lineException);
+ }
+ }
+ baseStationLocationHashMap = newBaseStationLocationHashMap;
+ } catch (Exception e) {
+ logger.error(this.getClass().getSimpleName() + " update error", e);
+ return false;
+ }
+ return true;
+ }
+
+ public BaseStationLocation lookup(String ecgi) {
+ return baseStationLocationHashMap.get(ecgi);
+ }
+
+ public void close() {
+ baseStationLocationHashMap.clear();
+ baseStationLocationHashMap = null;
+ }
+
+ private double parseDouble(String str) {
+ try {
+ return Double.parseDouble(str);
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ @Data
+ public static final class BaseStationLocation {
+ private double longitude;
+ private double latitude;
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java
new file mode 100644
index 0000000..db52b3e
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java
@@ -0,0 +1,69 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.geedgenetworks.core.udf.cn.LookupTestUtils.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/9/2 15:20
+ */
+class BaseStationLookupTest {
+
+ private static BaseStationLookup baseStationLookup;
+
+ @BeforeAll
+ static void setUp() {
+ /**
+ * Create a UDFContext object and set the parameters
+ * e.g.:
+ * - function: BASE_STATION_LOOKUP
+ * lookup_fields: [ cell_id ]
+ * output_fields: [ subscriber_longitude,subscriber_latitude ]
+ * parameters:
+ * kb_name: base_station_location
+ */
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Arrays.asList("cell_id"));
+ udfContext.setOutput_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude"));
+
+ RuntimeContext runtimeContext = mockRuntimeContext();
+
+ String content = "cell_id,longitude,latitude\n460-11-630947-1,93.9290001,42.66884";
+ mockKnowledgeBaseHandler(content);
+
+ baseStationLookup = new BaseStationLookup();
+ baseStationLookup.open(runtimeContext, udfContext);
+ }
+
+ @Test
+ void evaluate() {
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("cell_id", "460-11-630947-1");
+ fields.put("cell_type", 1L);
+ event.setExtractedFields(fields);
+ Event evaluate = baseStationLookup.evaluate(event);
+ assertEquals(93.9290001, evaluate.getExtractedFields().get("subscriber_longitude"));
+ assertEquals(42.66884, evaluate.getExtractedFields().get("subscriber_latitude"));
+ }
+
+ @AfterEach
+ void afterAll() {
+ clearState();
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java
new file mode 100644
index 0000000..641f58a
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java
@@ -0,0 +1,65 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/9/6 16:15
+ */
+public class H3CellLookupTest {
+
+
+ private static H3CellLookup h3CellLookup;
+
+ @BeforeAll
+ static void setUp() {
+ /**
+ * Create a UDFContext object and set the parameters
+ * e.g.:
+ * - function: BASE_STATION_LOOKUP
+ * lookup_fields: [ subscriber_longitude,subscriber_latitude ]
+ * output_fields: [ first_location ]
+ * parameters:
+ * resolution: 9
+ */
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude"));
+ udfContext.setOutput_fields(Arrays.asList("first_location"));
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("resolution", 9);
+ udfContext.setParameters(parameters);
+
+ RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
+
+ /*String content = "cell_id,longitude,latitude\n460-11-630947-1,93.9290001,42.66884";
+ mockKnowledgeBaseHandler(content);*/
+
+ h3CellLookup = new H3CellLookup();
+ h3CellLookup.open(runtimeContext, udfContext);
+ }
+
+ @Test
+ void testValueExpression() throws IOException {
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("subscriber_longitude", 116.390249);
+ fields.put("subscriber_latitude", 39.905392);
+ event.setExtractedFields(fields);
+ Event evaluate = h3CellLookup.evaluate(event);
+ assertEquals("8931aa42853ffff", evaluate.getExtractedFields().get("first_location"));
+
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java
index 05df41d..bf95c57 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java
@@ -47,6 +47,20 @@ public class LookupTestUtils {
private static MockedStatic<AbstractMultipleKnowledgeBaseHandler> abstractMultipleKnowledgeBaseHandlerMockedStatic = mockStatic(AbstractMultipleKnowledgeBaseHandler.class);
+ /**
+ * mock runtime context
+ * the configuration is set in the global job parameters
+ * the configuration contains the knowledge base configuration
+ * the knowledge base configuration contains the knowledge base name and file path and file type and file list
+ * e.g.:
+ * - name: cn_ip_location
+ * fs_type: http
+ * fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ * files:
+ * - 1
+ *
+ * @return runtime context
+ */
static RuntimeContext mockRuntimeContext() {
RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
@@ -77,6 +91,12 @@ public class LookupTestUtils {
}
}
+ /**
+ * mock knowledge base handler
+ * the knowledge base handler is used to get the metadata and read the file content
+ *
+ * @param downloadContent download content
+ */
static void mockKnowledgeBaseHandler(String downloadContent) {
checkStaticMock();
KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta();