diff options
4 files changed, 244 insertions, 0 deletions
diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java index d09e02a..51248c4 100644 --- a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java @@ -3,7 +3,9 @@ package org.apache.druid.query.udf; import com.google.inject.Binder; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.udf.expressions.CurrentTimestampMillisExprMacro; import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro; +import org.apache.druid.query.udf.sql.CurrentTimestampMillisOperatorConversion; import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion; import org.apache.druid.sql.guice.SqlBindings; @@ -11,7 +13,9 @@ public class UdfModule implements DruidModule { @Override public void configure(Binder binder) { SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, CurrentTimestampMillisOperatorConversion.class); ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class); + ExpressionModule.addExprMacro(binder, CurrentTimestampMillisExprMacro.class); } /*@Override diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java new file mode 100644 index 0000000..9535223 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java @@ -0,0 +1,57 @@ +package org.apache.druid.query.udf.expressions;
+
+import org.apache.druid.math.expr.*;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CurrentTimestampMillisExprMacro implements ExprMacroTable.ExprMacro {
+ private static final String NAME = "current_timestamp_millis"; // current_timestamp_millis
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public Expr apply(List<Expr> args) {
+ validationHelperCheckArgumentCount(args, 0);
+
+ class CurrentTimestampMillisExpr implements Expr {
+
+ @Override
+ public ExprEval eval(ObjectBinding bindings) {
+ return ExprEval.of(System.currentTimeMillis());
+ }
+
+ @Override
+ public String stringify() {
+ return "current_timestamp_millis";
+ }
+
+ @Override
+ public Expr visit(Shuttle shuttle) {
+ return shuttle.visit(this);
+ }
+
+ @Override
+ public BindingAnalysis analyzeInputs() {
+ return BindingAnalysis.EMTPY;
+ }
+
+ @Nullable
+ @Override
+ public ExpressionType getOutputType(InputBindingInspector inspector) {
+ return ExpressionType.LONG;
+ }
+
+ @Override
+ public boolean canVectorize(InputBindingInspector inspector) {
+ return false;
+ }
+ }
+
+ return new CurrentTimestampMillisExpr();
+ }
+
+}
diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java new file mode 100644 index 0000000..f75f5c7 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java @@ -0,0 +1,35 @@ +package org.apache.druid.query.udf.sql; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.*; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; + +public class CurrentTimestampMillisOperatorConversion implements SqlOperatorConversion { + private static final SqlFunction SQL_FUNCTION = OperatorConversions + .operatorBuilder("CURRENT_TIMESTAMP_MILLIS") + .operandTypes(SqlTypeFamily.ANY) + .requiredOperands(0) + .returnTypeNonNull(SqlTypeName.BIGINT) + .functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION) + .build(); + + @Override + public SqlOperator calciteOperator() { + return SQL_FUNCTION; + } + + @Nullable + @Override + public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) { + return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "current_timestamp_millis"); + } +} diff --git a/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java new file mode 100644 index 0000000..db83dd9 --- /dev/null +++ b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java @@ -0,0 +1,148 @@ +package org.apache.druid.query.udf.expressions;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.math.expr.*;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class CurrentTimestampMillisExprTest extends InitializedNullHandlingTest {
+ private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new CurrentTimestampMillisExprMacro()));
+ Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef"))
+ .put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L))
+ .put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234))
+ .put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"}))
+ .put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"}))
+ .put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null))
+ .put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null))
+ .build()
+ );
+
+ Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"}))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51"))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ // ...
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8"))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ InputBindings.forInputSuppliers(
+ new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
+ .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
+ .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
+ .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L))
+ .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51"))
+ .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
+ .build()
+ ),
+ };
+
+ @Test
+ public void test() throws Exception{
+ Expr expr = Parser.parse("current_timestamp_millis()", exprMacroTable);
+ System.out.println(expr.analyzeInputs().getRequiredBindings());
+ ExprEval eval = expr.eval(inputBindings);
+ System.out.println(eval.value());
+ Thread.sleep(1000);
+ eval = expr.eval(inputBindings);
+ System.out.println(eval.value());
+ Thread.sleep(1000);
+ expr = Parser.parse("current_timestamp_millis()", exprMacroTable);
+ eval = expr.eval(inputBindings);
+ System.out.println(eval.value());
+
+ }
+
+
+}
|
