Skip to content

Commit 95b8309

Browse files
authored
[FLINK-24862][hive]Fix hive user-defined function cannot be used normally in hive dialect (apache#17761)
1 parent a79e004 commit 95b8309

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
5959
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
6060
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
61+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
6162
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
6263
import org.apache.hadoop.hive.serde.serdeConstants;
6364
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
@@ -635,6 +636,35 @@ public void testTemporaryFunction() throws Exception {
635636
tableEnv.executeSql("drop temporary function if exists foo");
636637
}
637638

639+
@Test
640+
public void testTemporaryFunctionUDAF() throws Exception {
641+
// create temp function
642+
tableEnv.executeSql(
643+
String.format(
644+
"create temporary function temp_count as '%s'",
645+
GenericUDAFCount.class.getName()));
646+
String[] functions = tableEnv.listUserDefinedFunctions();
647+
assertArrayEquals(new String[] {"temp_count"}, functions);
648+
// call the function
649+
tableEnv.executeSql("create table src(x int)");
650+
tableEnv.executeSql("insert into src values (1),(-1)").await();
651+
assertEquals(
652+
"[+I[2]]",
653+
queryResult(tableEnv.sqlQuery("select temp_count(x) from src")).toString());
654+
// switch DB and the temp function can still be used
655+
tableEnv.executeSql("create database db1");
656+
tableEnv.useDatabase("db1");
657+
assertEquals(
658+
"[+I[2]]",
659+
queryResult(tableEnv.sqlQuery("select temp_count(x) from `default`.src"))
660+
.toString());
661+
// drop the function
662+
tableEnv.executeSql("drop temporary function temp_count");
663+
functions = tableEnv.listUserDefinedFunctions();
664+
assertEquals(0, functions.length);
665+
tableEnv.executeSql("drop temporary function if exists foo");
666+
}
667+
638668
@Test
639669
public void testCatalog() {
640670
List<Row> catalogs =

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -631,16 +631,14 @@ private void validateAndPrepareFunction(CatalogFunction function)
631631
// In this situation the UDF have not been validated and cleaned, so we need to validate it
632632
// and clean its closure here.
633633
// If the input is instance of `ScalarFunctionDefinition`, `TableFunctionDefinition` and so
634-
// on,
635-
// it means it uses the old type inference. We assume that they have been validated before
636-
// being
637-
// wrapped.
638-
if (function instanceof InlineCatalogFunction
639-
&& ((InlineCatalogFunction) function).getDefinition()
640-
instanceof UserDefinedFunction) {
641-
634+
// on, it means it uses the old type inference. We assume that they have been validated
635+
// before being wrapped.
636+
if (function instanceof InlineCatalogFunction) {
642637
FunctionDefinition definition = ((InlineCatalogFunction) function).getDefinition();
643-
UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
638+
if (definition instanceof UserDefinedFunction) {
639+
UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
640+
}
641+
// Skip validation if it's not a UserDefinedFunction.
644642
} else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
645643
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
646644
UserDefinedFunctionHelper.validateClass(

0 commit comments

Comments
 (0)