基于Apache-bahir-kudu-connector的flink-connector-kudu,支持Flink1.11.x DynamicTableSource/Sink,支持Range分区等

Overview

Kudu Connector

  • 基于Apache-Bahir-Kudu-Connector改造而来的满足公司内部使用的Kudu Connector,支持特性Range分区、定义Hash分桶数、支持Flink1.11.x动态数据源等,改造后已贡献部分功能给社区。

使用姿势

  • clone代码后,改造pom项目坐标后上传公司私服使用

Kudu Catalog使用

创建Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
catalog = new KuduCatalog("cdh01:7051,cdh02:7051,cdh03:7051");
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");

Catalog API

// dropTable
 catalog.dropTable(new ObjectPath("default_database", "test_Replice_kudu"), true);
 // 通过catalog操作表
tableEnv.sqlQuery("select * from test");
tableEnv.executeSql("drop table test");
tableEnv.executeSql("insert into testRange values(1,'hsm')");

FlinkSQL

KuduTable Properties

  • 通过connector.typeconnector区分使用TableSourceFactory还是KuduDynamicTableSource
kudu.table=指定映射的kudu表
kudu.masters=指定的kudu master地址
kudu.hash-columns=指定的表的hash分区键,多个使用","分割
kudu.replicas=kudu tablet副本数,默认为3
kudu.hash-partition-nums=hash分区的桶个数,默认为2 * replicas
kudu.range-partition-rule=range分区规则,rangeKey#leftValue,RightValue:rangeKey#leftValue1,RightValue1,rangeKey必须为主键
kudu.primary-key-columns=kudu表主键,多个实用","分割,主键定义必须有序
kudu.lookup.cache.max-rows=kudu时态表缓存最大缓存行,默认为不开启
kudu.lookup.cache.ttl=kudu时态表cache过期时间
kudu.lookup.max-retries=时态表join时报错重试次数,默认为3

Flink1.10.x版本

CREATE TABLE TestTableTableSourceFactory (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector.type' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)

Flink1.11.x版本

CREATE TABLE TestTableKuduDynamicTableSource (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)

DataStream使用

  • DataStream使用方式具体查看bahir-flink官方,目前对于数仓工程师使用场景偏少。

版本迭代

1.1版本Feature

  • 增加Hash分区bucket属性配置,通过kudu.hash-partition-nums配置
  • 增加Range分区规则,支持Hash和Range分区同时使用,通过参数kudu.range-partition-rule 配置,规则格式如:range分区规则,rangeKey#leftValue,RightValue:rangeKey#leftValue1,RightValue1
  • 增加Kudu时态表支持,通过kudu.lookup.*相关函数控制内存数据的大小和TTL
 /**
     * lookup缓存最大行数
     */
  public static final String KUDU_LOOKUP_CACHE_MAX_ROWS = "kudu.lookup.cache.max-rows";
    /**
     * lookup缓存过期时间
     */
    public static final String KUDU_LOOKUP_CACHE_TTL = "kudu.lookup.cache.ttl";
    /**
     * kudu连接重试次数
     */
    public static final String KUDU_LOOKUP_MAX_RETRIES = "kudu.lookup.max-retries";

实现机制

  • 自定义KuduLookupFunction,使得KuduTableSource实现LookupableTableSource接口将自定义LookupFunction 返回已提供时态表的功能,底层缓存没有使用Flink JDBCGuava Cache而是使用效率更高的Caffeine Cache使得其缓存效率更高,同时也减轻了因大量请求为Kudu带来的压力

未来展望

当前问题

  1. SQL语句主键无法自动推断

目前基于Apache Bahir Kudu Connector增强的功能主要是为了服务公司业务,在使用该版本的connector也遇到了问题,SQL的主键无法自动推断导致数据无法直接传递到下游,内部通过天宫引擎通过Flink Table APIsqlQuery方法将结果集查询为一个Table对象,然后将Table转换为DataStream>撤回流,最终通过Kudu Connector提供的KuduSinkUpsertOperationMapper对象将撤回流输出到Kudu中。

后续计划

  • 计划提供动态数据源来解决这一问题,将Flink 1.11.x之前的KuduTableSource/KuduTableSink改造为DynamicSource/Sink接口实现Source/Sink,以此解决主键推断问题。

1.2版本Feature

  • 改造支持Flink 1.11.x之后的DynamicSource/Sink,以此解决SQL语句主键无法推断问题,支持流批JOIN功能的SQL语句方式,无需在通过转换成DataStream的方式进行多表Join操作。
  • 内嵌Metrics上报机制,通过对Flink动态工厂入口处对操作的kudu表进行指标埋点,从而更加可视化的监控kudu表数据上报问题。
Issues
  • 大佬,这个flink-kudu-connector支持checkpoint吗?

    大佬,这个flink-kudu-connector支持checkpoint吗?

    Sink: Sink(table=[default_catalog.default_database.teacher4], fields=[id, name, age]) (1/1) (8ddb33f3da34221584f625438dc18a8c) switched from RUNNING to FAILED on container_1618471122700_0004_01_000002 @ spark1 (dataPort=36997). java.lang.Exception: Error while triggering checkpoint 1 for Source: TableSourceScan(table=[[default_catalog, default_database, stu4_kafka]], fields=[id, name, age]) -> DropUpdateBefore -> Sink: Sink(table=[default_catalog.default_database.teacher4], fields=[id, name, age]) (1/1)#0 at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1176) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:840) ~[Flink_test-1.0-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[Flink_test-1.0-SNAPSHOT.jar:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [Flink_test-1.0-SNAPSHOT.jar:?] Caused by: java.lang.UnsupportedOperationException: triggerCheckpointAsync not supported by org.apache.flink.streaming.runtime.tasks.SourceStreamTask at org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable.triggerCheckpointAsync(AbstractInvokable.java:222) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1166) ~[Flink_test-1.0-SNAPSHOT.jar:?] ... 26 more

    opened by amazingSaltFish 3
  • 前来观摩,大佬nb

    前来观摩,大佬nb

    opened by Flat-Chen 1
  • 前来观摩,大佬nb

    前来观摩,大佬nb

    opened by roohom 1
  • 前来观摩,大佬nb

    前来观摩,大佬nb

    正好需要用到这个东西,百思不得其解。不得不说,大佬nb!

    opened by amazingSaltFish 0
Owner
Shimin Huang
opensource believer.
Shimin Huang
Alibaba Java Diagnostic Tool Arthas/Alibaba Java诊断利器Arthas

Arthas Arthas is a Java Diagnostic tool open sourced by Alibaba. Arthas allows developers to troubleshoot production issues for Java applications with

Alibaba 25k Mar 13, 2021