From 5645d5cb004ac1a70e8c59598f7da158234ca2fb Mon Sep 17 00:00:00 2001 From: gujinkai Date: Tue, 3 Sep 2024 15:41:17 +0800 Subject: [improve][core]CN add funcrions: baseStationLookup and h3CellLookup --- groot-core/pom.xml | 6 ++ .../core/udf/cn/BaseStationLookup.java | 81 +++++++++++++++++++ .../geedgenetworks/core/udf/cn/H3CellLookup.java | 84 ++++++++++++++++++++ .../handler/BaseStationKnowledgeBaseHandler.java | 90 ++++++++++++++++++++++ .../core/udf/cn/BaseStationLookupTest.java | 69 +++++++++++++++++ .../core/udf/cn/H3CellLookupTest.java | 65 ++++++++++++++++ .../core/udf/cn/LookupTestUtils.java | 20 +++++ 7 files changed, 415 insertions(+) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java diff --git a/groot-core/pom.xml b/groot-core/pom.xml index e723fa5..52448c9 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -13,6 +13,12 @@ + + com.uber + h3 + 4.1.1 + + org.mock-server mockserver-netty diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java new file mode 100644 index 0000000..d506461 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java @@ -0,0 +1,81 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.BaseStationKnowledgeBaseHandler; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.util.List; +import java.util.Map; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/8/19 15:21 + */ +public class BaseStationLookup extends AbstractKnowledgeScalarFunction { + + private String cellIdFieldName; + + private String longitudeFieldName; + + private String latitudeFieldName; + + private BaseStationKnowledgeBaseHandler knowledgeBaseHandler; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + super.open(runtimeContext, udfContext); + if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + + int lookupFieldsSize = 1; + if (udfContext.getLookup_fields().size() != lookupFieldsSize) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain field: cell_id"); + } + + int outputFieldsSize = 2; + if (udfContext.getOutput_fields().size() != outputFieldsSize) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "output_fields must contain two fields: longitude and latitude"); + } + + List lookupFields = udfContext.getLookup_fields(); + cellIdFieldName = lookupFields.get(0); + List outputFields = udfContext.getOutput_fields(); + longitudeFieldName = outputFields.get(0); + latitudeFieldName = outputFields.get(1); + } + + @Override + public Event evaluate(Event event) { + Map extractedFields = event.getExtractedFields(); + if (extractedFields == null || extractedFields.get(cellIdFieldName) == null) { + return event; + } + BaseStationKnowledgeBaseHandler.BaseStationLocation lookup = knowledgeBaseHandler.lookup(extractedFields.get(cellIdFieldName).toString()); + if (lookup != null) { + extractedFields.put(longitudeFieldName, lookup.getLongitude()); + extractedFields.put(latitudeFieldName, lookup.getLatitude()); + } + return event; + } + + @Override + public String functionName() { + return "BASE_STATION_LOOKUP"; + } + + @Override + public void close() { + } + + @Override + protected void registerKnowledges() { + knowledgeBaseHandler = BaseStationKnowledgeBaseHandler.getInstance(); + KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0)); + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java new file mode 100644 index 0000000..2698772 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.ScalarFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.uber.h3core.H3Core; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/8/19 14:50 + */ +public class H3CellLookup implements ScalarFunction { + + private String longitudeFieldName; + + private String latitudeFieldName; + + private String outputFieldName; + + private int res; + + private H3Core h3; + + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + + int lookupFieldsSize = 2; + if (udfContext.getLookup_fields().size() != lookupFieldsSize) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain two fields: longitude and latitude"); + } + + String resolution = "resolution"; + int maxResolution = 15; + if (!udfContext.getParameters().containsKey(resolution) || !(udfContext.getParameters().get(resolution) instanceof Integer) || (int) udfContext.getParameters().get(resolution) < 0 || (int) udfContext.getParameters().get(resolution) > maxResolution) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contain key resolution and the value is between 0 and 15"); + } + + List lookupFields = udfContext.getLookup_fields(); + longitudeFieldName = lookupFields.get(0); + latitudeFieldName = lookupFields.get(1); + outputFieldName = udfContext.getOutput_fields().get(0); + res = (int) udfContext.getParameters().get("resolution"); + try { + h3 = H3Core.newInstance(); + } catch (IOException io) { + throw new GrootStreamRuntimeException(CommonErrorCode.FILE_OPERATION_ERROR, "Failed to create H3Core instance"); + } + } + + @Override + public Event evaluate(Event event) { + Map extractedFields = event.getExtractedFields(); + if (extractedFields == null || extractedFields.get(longitudeFieldName) == null || extractedFields.get(latitudeFieldName) == null) { + return event; + } + if (!(extractedFields.get(longitudeFieldName) instanceof Double) || !(extractedFields.get(latitudeFieldName) instanceof Double)) { + return event; + } + String cellAddress = h3.latLngToCellAddress((double) extractedFields.get(latitudeFieldName), (double) extractedFields.get(longitudeFieldName), res); + extractedFields.put(outputFieldName, cellAddress); + return event; + } + + @Override + public String functionName() { + return "H3_CELL_LOOKUP"; + } + + @Override + public void close() { + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java new file mode 100644 index 0000000..0eeae53 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/BaseStationKnowledgeBaseHandler.java @@ -0,0 +1,90 @@ +package com.geedgenetworks.core.udf.knowlegdebase.handler; + +import com.geedgenetworks.core.utils.cn.csv.HighCsvReader; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/8/26 16:27 + */ +public class BaseStationKnowledgeBaseHandler extends AbstractSingleKnowledgeBaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(BaseStationKnowledgeBaseHandler.class); + + private HashMap baseStationLocationHashMap = new HashMap<>(); + + private BaseStationKnowledgeBaseHandler() { + } + + private static final class InstanceHolder { + private static final BaseStationKnowledgeBaseHandler instance = new BaseStationKnowledgeBaseHandler(); + } + + public static BaseStationKnowledgeBaseHandler getInstance() { + return BaseStationKnowledgeBaseHandler.InstanceHolder.instance; + } + + @Override + public Boolean buildKnowledgeBase() { + try { + List needColumns = new ArrayList<>(); + needColumns.add("cell_id"); + needColumns.add("longitude"); + needColumns.add("latitude"); + byte[] content = downloadFile(); + HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns); + HashMap newBaseStationLocationHashMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F)); + HighCsvReader.CsvIterator iterator = highCsvReader.getIterator(); + while (iterator.hasNext()) { + Map line = iterator.next(); + try { + String cellId = line.get("cell_id"); + BaseStationLocation baseStationLocation = new BaseStationLocation(); + baseStationLocation.setLongitude(parseDouble(line.get("longitude"))); + baseStationLocation.setLatitude(parseDouble(line.get("latitude"))); + newBaseStationLocationHashMap.put(cellId, baseStationLocation); + } catch (Exception lineException) { + logger.error(this.getClass().getSimpleName() + " line: " + line.toString() + " parse error:" + lineException, lineException); + } + } + baseStationLocationHashMap = newBaseStationLocationHashMap; + } catch (Exception e) { + logger.error(this.getClass().getSimpleName() + " update error", e); + return false; + } + return true; + } + + public BaseStationLocation lookup(String ecgi) { + return baseStationLocationHashMap.get(ecgi); + } + + public void close() { + baseStationLocationHashMap.clear(); + baseStationLocationHashMap = null; + } + + private double parseDouble(String str) { + try { + return Double.parseDouble(str); + } catch (Exception e) { + return 0; + } + } + + @Data + public static final class BaseStationLocation { + private double longitude; + private double latitude; + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java new file mode 100644 index 0000000..db52b3e --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java @@ -0,0 +1,69 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static com.geedgenetworks.core.udf.cn.LookupTestUtils.*; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/9/2 15:20 + */ +class BaseStationLookupTest { + + private static BaseStationLookup baseStationLookup; + + @BeforeAll + static void setUp() { + /** + * Create a UDFContext object and set the parameters + * e.g.: + * - function: BASE_STATION_LOOKUP + * lookup_fields: [ cell_id ] + * output_fields: [ subscriber_longitude,subscriber_latitude ] + * parameters: + * kb_name: base_station_location + */ + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Arrays.asList("cell_id")); + udfContext.setOutput_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude")); + + RuntimeContext runtimeContext = mockRuntimeContext(); + + String content = "cell_id,longitude,latitude\n460-11-630947-1,93.9290001,42.66884"; + mockKnowledgeBaseHandler(content); + + baseStationLookup = new BaseStationLookup(); + baseStationLookup.open(runtimeContext, udfContext); + } + + @Test + void evaluate() { + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("cell_id", "460-11-630947-1"); + fields.put("cell_type", 1L); + event.setExtractedFields(fields); + Event evaluate = baseStationLookup.evaluate(event); + assertEquals(93.9290001, evaluate.getExtractedFields().get("subscriber_longitude")); + assertEquals(42.66884, evaluate.getExtractedFields().get("subscriber_latitude")); + } + + @AfterEach + void afterAll() { + clearState(); + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java new file mode 100644 index 0000000..641f58a --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java @@ -0,0 +1,65 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/9/6 16:15 + */ +public class H3CellLookupTest { + + + private static H3CellLookup h3CellLookup; + + @BeforeAll + static void setUp() { + /** + * Create a UDFContext object and set the parameters + * e.g.: + * - function: BASE_STATION_LOOKUP + * lookup_fields: [ subscriber_longitude,subscriber_latitude ] + * output_fields: [ first_location ] + * parameters: + * resolution: 9 + */ + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude")); + udfContext.setOutput_fields(Arrays.asList("first_location")); + Map parameters = new HashMap<>(); + parameters.put("resolution", 9); + udfContext.setParameters(parameters); + + RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); + + /*String content = "cell_id,longitude,latitude\n460-11-630947-1,93.9290001,42.66884"; + mockKnowledgeBaseHandler(content);*/ + + h3CellLookup = new H3CellLookup(); + h3CellLookup.open(runtimeContext, udfContext); + } + + @Test + void testValueExpression() throws IOException { + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("subscriber_longitude", 116.390249); + fields.put("subscriber_latitude", 39.905392); + event.setExtractedFields(fields); + Event evaluate = h3CellLookup.evaluate(event); + assertEquals("8931aa42853ffff", evaluate.getExtractedFields().get("first_location")); + + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java index 05df41d..bf95c57 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java @@ -47,6 +47,20 @@ public class LookupTestUtils { private static MockedStatic abstractMultipleKnowledgeBaseHandlerMockedStatic = mockStatic(AbstractMultipleKnowledgeBaseHandler.class); + /** + * mock runtime context + * the configuration is set in the global job parameters + * the configuration contains the knowledge base configuration + * the knowledge base configuration contains the knowledge base name and file path and file type and file list + * e.g.: + * - name: cn_ip_location + * fs_type: http + * fs_path: http://192.168.44.55:9999/v1/knowledge_base + * files: + * - 1 + * + * @return runtime context + */ static RuntimeContext mockRuntimeContext() { RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); @@ -77,6 +91,12 @@ public class LookupTestUtils { } } + /** + * mock knowledge base handler + * the knowledge base handler is used to get the metadata and read the file content + * + * @param downloadContent download content + */ static void mockKnowledgeBaseHandler(String downloadContent) { checkStaticMock(); KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta(); -- cgit v1.2.3