找回密码
 立即注册
首页 业界区 业界 6. Calcite添加自定义函数

6. Calcite添加自定义函数

宿遘稠 2025-6-4 22:17:17
1. 简介

在上篇博文中介绍了如何使用calcite进行sql验证, 但是真正在实际生产环境中我们可能需要使用到

  • 用户自定义函数(UDF): 通过代码实现对应的函数逻辑并注册给calcite

    • sql验证: 将UDF信息注册给calcite, SqlValidator.validator验证阶段即可通过验证
    • sql执行: calcite通过调用UDF逻辑实现函数逻辑

  • 自定义db函数: 数据库中创建的自定义函数

    • sql验证: 将自定义的db函数信息注册给calcite, SqlValidator.validator验证阶段即可通过验证
    • sql执行: 下推到db执行对应的db函数

此时我们需要将自定义的函数注册到calcite中, 用于sql验证和执行. 例如注册一个简单的函数 如: 将数据库中的性别字段值做字典转换.
2. Maven
  1. <dependency>
  2.     <groupId>org.apache.calcite</groupId>
  3.     calcite-core</artifactId>
  4.     <version>1.37.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>mysql</groupId>
  8.     mysql-connector-java</artifactId>
  9.     <version>8.0.33</version>
  10. </dependency>
复制代码
2. UDF

如上述所说, UDF是将用户自定义的方法注册为函数使用的, 首先看一下calcite是如何注册UDF的
  1. SchemaPlus#add(String name, Function function);
复制代码
其Function的实现类如下:
1.png


  • 定义UDF实现
    1. public class Udf {
    2.     public static String dictSex(String code) {
    3.         if (StringUtils.isBlank(code)) {
    4.             return code;
    5.         }
    6.         if (StringUtils.equals(code, "1")) {
    7.             return "男";
    8.         } else if (StringUtils.equals(code, "2")) {
    9.             return "女";
    10.         }
    11.         else {
    12.             return "未知";
    13.         }
    14.     }
    15. }
    复制代码
  • 把dictSex方法注册到calcite中, 因为上述的方法输入返回的都是单一值, 所以直接注册为标量函数即可(如果是聚合函数可以使用AggregateFunction)
    1. // 指定函数名称 和 对应函数的class & method name
    2. rootSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));
    复制代码
  • 测试执行
    1. final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");
    2. printResultSet(resultSet);
    复制代码
    表数据如下
    2.png

    输出结果
    1. c.l.c.CalciteFuncTest - [printResultSet,86] - Number of columns: 2
    2. c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=男, username=张三}
    3. c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=李四}
    4. c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=张铁牛}
    复制代码
3. 自定义db函数

首先 我们定义一个db 函数实现字典值的转换
  1. DELIMITER //
  2. CREATE FUNCTION dict_sex(code VARCHAR(10))
  3. RETURNS VARCHAR(10)
  4. DETERMINISTIC
  5. BEGIN
  6.     -- 如果code为空或只包含空白字符,则直接返回code
  7.     IF code IS NULL OR TRIM(code) = '' THEN
  8.         RETURN code;
  9.     END IF;
  10.     -- 如果code为'1'则返回'男'
  11.     IF code = '1' THEN
  12.         RETURN '男';
  13.     -- 如果code为'2'则返回'女'
  14.     ELSEIF code = '2' THEN
  15.         RETURN '女';
  16.     ELSE
  17.         RETURN '未知';
  18.     END IF;
  19. END //
  20. DELIMITER ;
复制代码
验证函数功能
3.png

ok, 函数创建完成, 我们将函数注册到calcite中
calcite中sqlfunction有很多其已经实现的类, 我们这里使用SqlBasicFunction来创建我们的函数
4.png


  • 定义SqlFunction
    1. /*
    2. * SqlBasicFunction create(String name, SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker operandTypeChecker)
    3. * name: 函数名称
    4. * returnTypeInference: 返回值类型
    5. * operandTypeChecker: 函数入参的校验器
    6. */
    7. SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));
    复制代码
  • 注册SqlFunction
    从上篇博文中我们知道, calcite的sql函数都注册到了SqlStdOperatorTable类中, 所以我们只需要将自定义的函数注册进即可
    1. final SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
    2. sqlStdOperatorTable.register(DICT_SEX);
    复制代码
    对, 就这么简单. 因为SqlStdOperatorTable类是单例模式, 所以我们可以随时随地的进行注册, 其验证逻辑就可以直接调用了
    当然, 看了其他博客大多数都是继承SqlStdOperatorTable类实现自定义SqlStdOperatorTable的 如下, 最后使用自己的SqlStdOperatorTable即可
    1. public static class SqlCustomOperatorTable extends SqlStdOperatorTable {
    2.     private static SqlCustomOperatorTable instance;
    3.     // 只需要申明为成员变量即可, instance.init() 的时候会反射取变量进行注册
    4.     public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));
    5.     public static synchronized SqlCustomOperatorTable instance() {
    6.         if (instance == null) {
    7.             instance = new SqlCustomOperatorTable();
    8.             instance.init();
    9.         }
    10.         return instance;
    11.     }
    12.   
    13.     /**
    14.      * 如果想修改获取函数的过程, 可以重写此方法
    15.      */
    16.     @Override
    17.     protected void lookUpOperators(String name, boolean caseSensitive, Consumer<SqlOperator> consumer) {
    18.         super.lookUpOperators(name, caseSensitive, consumer);
    19.     }
    20. }
    复制代码
  • 测试执行
    1. final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");
    2. printResultSet(resultSet);
    复制代码
    输出结果
    1. c.l.c.CalciteFuncTest - [printResultSet,86] - Number of columns: 2
    2. c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=男, username=张三}
    3. c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=李四}
    4. c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=张铁牛}
    复制代码
    经测试: 如果udf 和 sqlfunction 同时存在的时候 优先使用udf
4. 完整代码

4.1 udf

[code]package com.ldx.calcite;import com.google.common.collect.Maps;import com.mysql.cj.jdbc.MysqlDataSource;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.apache.calcite.adapter.jdbc.JdbcSchema;import org.apache.calcite.config.Lex;import org.apache.calcite.jdbc.CalciteConnection;import org.apache.calcite.schema.SchemaPlus;import org.apache.calcite.schema.impl.ScalarFunctionImpl;import org.junit.jupiter.api.BeforeAll;import org.junit.jupiter.api.Test;import javax.sql.DataSource;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import java.util.Map;import java.util.Properties;import static org.apache.calcite.config.CalciteConnectionProperty.LEX;@Slf4jpublic class CalciteFuncWithUdfTest {    private static Statement statement;    @BeforeAll    @SneakyThrows    public static void beforeAll() {        Properties info = new Properties();        // 不区分sql大小写        info.setProperty("caseSensitive", "false");        info.setProperty(LEX.camelName(), Lex.MYSQL.name());        // 创建Calcite连接        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);        // 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下        SchemaPlus rootSchema = calciteConnection.getRootSchema();        // 设置默认的schema, 如果不设置sql中需要加上对应数据源的名称        calciteConnection.setSchema("my_mysql");        final DataSource mysqlDataSource = getMysqlDataSource();        final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);        final SchemaPlus myMysqlSchema = rootSchema.add("my_mysql", schemaWithMysql);        // 全局注册        rootSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));        statement = calciteConnection.createStatement();        // 只注册到mysql schema中        // myMysqlSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));        // 创建SQL语句执行查询        statement = calciteConnection.createStatement();    }    @Test    @SneakyThrows    public void test_udf_func() {        final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");        printResultSet(resultSet);    }    private static DataSource getMysqlDataSource() {        MysqlDataSource dataSource = new MysqlDataSource();        dataSource.setUrl("jdbc:mysql://localhost:3306/test");        dataSource.setUser("root");        dataSource.setPassword("123456");        return dataSource;    }    public static void printResultSet(ResultSet resultSet) throws SQLException {        // 获取 ResultSet 元数据        ResultSetMetaData metaData = resultSet.getMetaData();        // 获取列数        int columnCount = metaData.getColumnCount();        log.info("Number of columns: {}",columnCount);        // 遍历 ResultSet 并打印结果        while (resultSet.next()) {            final Map item = Maps.newHashMap();            // 遍历每一列并打印            for (int i = 1; i
您需要登录后才可以回帖 登录 | 立即注册