summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-10-17 18:26:38 +0800
committerlifengchao <[email protected]>2024-10-17 18:26:38 +0800
commit5765edf67121f9291cebb9bf173d1627e7c7e9ca (patch)
tree43342ba4e1cdd957e09dc8753f22a981f02fdcff
parentb754e83ba07ea68d0f02673b3c1bb3ac997c92e8 (diff)
TSG-22756 添加current_timestamp_millis udf输出当前时间戳develop
-rw-r--r--druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java4
-rw-r--r--druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java57
-rw-r--r--druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java35
-rw-r--r--druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java148
4 files changed, 244 insertions, 0 deletions
diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java
index d09e02a..51248c4 100644
--- a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java
+++ b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java
@@ -3,7 +3,9 @@ package org.apache.druid.query.udf;
import com.google.inject.Binder;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.query.udf.expressions.CurrentTimestampMillisExprMacro;
import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro;
+import org.apache.druid.query.udf.sql.CurrentTimestampMillisOperatorConversion;
import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings;
@@ -11,7 +13,9 @@ public class UdfModule implements DruidModule {
@Override
public void configure(Binder binder) {
SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class);
+ SqlBindings.addOperatorConversion(binder, CurrentTimestampMillisOperatorConversion.class);
ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class);
+ ExpressionModule.addExprMacro(binder, CurrentTimestampMillisExprMacro.class);
}
/*@Override
diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java
new file mode 100644
index 0000000..9535223
--- /dev/null
+++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java
@@ -0,0 +1,57 @@
+package org.apache.druid.query.udf.expressions;
+
+import org.apache.druid.math.expr.*;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CurrentTimestampMillisExprMacro implements ExprMacroTable.ExprMacro {
+ private static final String NAME = "current_timestamp_millis"; // current_timestamp_millis
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public Expr apply(List<Expr> args) {
+ validationHelperCheckArgumentCount(args, 0);
+
+ class CurrentTimestampMillisExpr implements Expr {
+
+ @Override
+ public ExprEval eval(ObjectBinding bindings) {
+ return ExprEval.of(System.currentTimeMillis());
+ }
+
+ @Override
+ public String stringify() {
+ return "current_timestamp_millis";
+ }
+
+ @Override
+ public Expr visit(Shuttle shuttle) {
+ return shuttle.visit(this);
+ }
+
+ @Override
+ public BindingAnalysis analyzeInputs() {
+ return BindingAnalysis.EMTPY;
+ }
+
+ @Nullable
+ @Override
+ public ExpressionType getOutputType(InputBindingInspector inspector) {
+ return ExpressionType.LONG;
+ }
+
+ @Override
+ public boolean canVectorize(InputBindingInspector inspector) {
+ return false;
+ }
+ }
+
+ return new CurrentTimestampMillisExpr();
+ }
+
+}
diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java
new file mode 100644
index 0000000..f75f5c7
--- /dev/null
+++ b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java
@@ -0,0 +1,35 @@
+package org.apache.druid.query.udf.sql;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.*;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.OperatorConversions;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import javax.annotation.Nullable;
+
+public class CurrentTimestampMillisOperatorConversion implements SqlOperatorConversion {
+ private static final SqlFunction SQL_FUNCTION = OperatorConversions
+ .operatorBuilder("CURRENT_TIMESTAMP_MILLIS")
+ .operandTypes(SqlTypeFamily.ANY)
+ .requiredOperands(0)
+ .returnTypeNonNull(SqlTypeName.BIGINT)
+ .functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION)
+ .build();
+
+ @Override
+ public SqlOperator calciteOperator() {
+ return SQL_FUNCTION;
+ }
+
+ @Nullable
+ @Override
+ public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) {
+ return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "current_timestamp_millis");
+ }
+}
diff --git a/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java
new file mode 100644
index 0000000..db83dd9
--- /dev/null
+++ b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java
@@ -0,0 +1,148 @@
+package org.apache.druid.query.udf.expressions;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.math.expr.*;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class CurrentTimestampMillisExprTest extends InitializedNullHandlingTest {
+ private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new CurrentTimestampMillisExprMacro()));
+ Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef"))
+ .put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L))
+ .put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234))
+ .put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"}))
+ .put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"}))
+ .put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null))
+ .put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null))
+ .build()
+ );
+
+ Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"}))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51"))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ // ...
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8"))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51"))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ };
+
+ @Test
+ public void test() throws Exception{
+ Expr expr = Parser.parse("current_timestamp_millis()", exprMacroTable);
+ System.out.println(expr.analyzeInputs().getRequiredBindings());
+ ExprEval eval = expr.eval(inputBindings);
+ System.out.println(eval.value());
+ Thread.sleep(1000);
+ eval = expr.eval(inputBindings);
+ System.out.println(eval.value());
+ Thread.sleep(1000);
+ expr = Parser.parse("current_timestamp_millis()", exprMacroTable);
+ eval = expr.eval(inputBindings);
+ System.out.println(eval.value());
+
+ }
+
+
+}