54、Flink 测试工具测试 Flink 作业详解

测试 Flink 作业
a)JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.19.0</version>    
    <scope>test</scope>
</dependency>

示例:MapFunction

public class IncrementMapFunction implements MapFunction<Long, Long> {

    @Override
    public Long map(Long record) throws Exception {
        return record + 1;
    }
}

在本地 Flink 集群使用这个 MapFunction 的简单 pipeline,如下所示。

public class ExampleIntegrationTest {

     @ClassRule
     public static MiniClusterWithClientResource flinkCluster =
         new MiniClusterWithClientResource(
             new MiniClusterResourceConfiguration.Builder()
                 .setNumberSlotsPerTaskManager(2)
                 .setNumberTaskManagers(1)
                 .build());

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new IncrementMapFunction())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            values.add(value);
        }
    }
}

使用 MiniClusterWithClientResource 进行集成测试的注意

  • 为了不将整个 pipeline 代码从生产复制到测试,请将 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
  • 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,可以使用测试的 sink 将数据写入临时目录的文件中。
  • 如果作业使用事件时间定时器,则可以实现自定义的 并行 源函数来发出 watermark。
  • 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
  • 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
  • 如果 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/764580.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【论文阅读】XuanYuan: An AI-Native Database

XuanYuan: An AI-Native Database 这篇文章主要是讨论了AI4DB 和 DB4AI 集成的数据库架构&#xff0c;以此提出了AI原生的数据库&#xff0c;架构如下&#xff1a; 而具体发展阶段来说&#xff0c;AI原生数据库主要由五个阶段组成 第一阶段&#xff0c;AI建议型数据库&#xf…

MQ运行时遇到的问题

遇到的问题描述&#xff1a;我在绑定通道的时候发现了通道绑定失败&#xff0c; 原因&#xff1a; 在代码中我第一次创建交换机的时候类型的默认没有修改成topic类型的&#xff0c;导致后面的代码再去进行注册的时候并没有实现那个类型 解决&#xff1a; 更改代码&#xff0…

对不起,AI大模型不是风口

“我们正处在全新起点&#xff0c;这是一个以大模型为核心的人工智能新时代&#xff0c;大模型改变了人工智能&#xff0c;大模型即将改变世界。”——5月26日&#xff0c;百度创始人、董事长兼CEO李彦宏先生在2023中关村论坛发表了《大模型改变世界》演讲。 李彦宏指出&#…

S7---代码编译和固件下载

目录 1.代码下载 2. 工具安装 3.环境变量 4.驱动安装 5.代码编译 6.固件下载 S7和S7 Pro Gen 1音频平台 S7 Gen 1音频平台基于QCC722x蓝牙音频SoC&#xff0c;针对耳塞和其他便携式和可穿戴应用。 S7 Pro Gen 1音频平台基于QCC722x蓝牙音频SoC和QCP7321微电源Wi-Fi收发器…

Nacos2.3.x动态刷新不生效

1.日志分析 Ignore the empty nacos configuration and get it based on dataId[null.yaml] & group[DEFAULT_GROUP] Ignore the empty nacos configuration and get it based on dataId[null-local.yaml] & group[DEFAULT_GROUP] 从日志文件分析中可以得到 dataId[n…

TypeScript 中 const enum 和 enum 的核心区别在哪?日常开发应该使用哪个?

编译结果 enum 会生成一个对象&#xff0c;引用的地方保持对其引用 const enum 会擦除 enum 定义的代码&#xff0c;引用的地方会生成 inline code 使用enum&#xff1a; 使用const enum&#xff1a; PS&#xff1a;编译选项 preserveConstEnums 可以使 const enum 不去擦除 …

深度学习之半监督学习:一文梳理目标检测中的半监督学习策略

什么是半监督目标检测&#xff1f; 传统机器学习根据训练数据集中的标注情况&#xff0c;有着不同的场景&#xff0c;主要包括&#xff1a;监督学习、弱监督学习、弱半监督学习、半监督学习。由于目标检测任务的特殊性&#xff0c;在介绍半监督目标检测方法之前&#xff0c;我…

镜像私服Harbor 2.0安装-探索工厂模式:如何优化Harbor项目管理与API集成

文章目录 一、docker-compose1. 下载 Docker Compose&#xff1a;2.添加执行权限&#xff1a;3.验证安装 二、安装harbor 2.01.下载harbor离线包2. 根据需求配置 Harbor3.给harbor创建SSL证书4.预编译harbor5. 安装并启动 Harbor (必须到你安装的目录) 三、登录harbor的web页面…

哈尔滨如何选择合适的等保测评机构?

选择合适的等保测评机构确实需要细致考虑&#xff0c;您提到的八个方面已经非常全面&#xff0c;涵盖了资质、专业能力、服务质量和合规性等多个关键点。为了进一步确保所选机构的可靠性&#xff0c;还可以考虑以下几点&#xff1a; 1.技术创新与工具&#xff1a;了解测评机构是…

鸿蒙生态应用开发白皮书V3.0

来源&#xff1a;华为&#xff1a; 近期历史回顾&#xff1a;

红酒SPA:享受放松与奢华的很好结合

在繁忙的都市生活中&#xff0c;人们总是渴望找到一片宁静的天地&#xff0c;让疲惫的身心得到很好的放松。而红酒SPA&#xff0c;作为一种不同的放松方式&#xff0c;将红酒的浪漫与SPA的舒适整合&#xff0c;为现代人带来了一场奢华享受。 一、红酒的浪漫与SPA的舒适 红酒&a…

北京网站建设怎么开始做

北京作为中国的首都&#xff0c;拥有众多的企业和机构&#xff0c;网站建设不仅是一种宣传和推广的手段&#xff0c;更是企业发展的必备工具。但是对于很多企业来说&#xff0c;网站建设是一个相对陌生的领域&#xff0c;不知道从哪里开始。今天我们就来谈一谈北京网站建设的步…

算法-位图与底层运算逻辑

文章目录 1. 位图的理论基础2. 完整版位图实现3. 底层的运算逻辑-位运算 1. 位图的理论基础 首先我们要理解什么是位图, 位图的一些作用是什么 位图法就是bitmap的缩写。所谓bitmap&#xff0c;就是用每一位来存放某种状态&#xff0c;适用于大规模数据&#xff0c;但数据状态又…

【HDC.2024】探索无限可能:华为云区块链+X,创新融合新篇章

6月23日&#xff0c;华为开发者大会2024&#xff08;HDC 2024&#xff09;期间&#xff0c; “「区块链X」多元行业场景下的创新应用”分论坛在东莞松山湖举行&#xff0c;区块链技术再次成为焦点。本次论坛以"区块链X"为主题&#xff0c;集结了行业专家、技术领袖、…

fyne的MultiLineEntry设置大小

MultiLineEntry设置大小 在另一篇文章讲过&#xff0c;放入border布局中&#xff0c;可以最大化MultiLineEntry。 这里再介绍另一种方法:SetMinRowsVisible() func (e *Entry) SetMinRowsVisible(count int) {e.multiLineRows counte.Refresh() }SetMinRowsVisible强制mult…

Typora(跨平台 Markdown 编辑器 )正版值得购买吗

Typora 是一款桌面 Markdown 编辑器&#xff0c;作为国人开发的优秀软件&#xff0c;一直深受用户的喜爱。 实时预览格式 Typora 是一款适配 Windows / macOS / Linux 平台的 Markdown 编辑器&#xff0c;编辑实时预览标记格式&#xff0c;所见即所得&#xff0c;轻巧而强大…

Linux kernel 与 设备树

Linux kernel 与 设备树 1 介绍1.1 概述1.2 发展历程1.3 各版本发布时间及特色1.4 Linux 单内核1.5 Linux 内核网址1.6 NXP 官方镜像与 野火 鲁班猫镜像的区别 2 Linux 内核组成2.1 进程管理2.2 内存管理2.3 文件系统2.4 设备管理2.5 网络功能 3 Linux 内核编译3.1 编译 Kernel…

llm学习-2(使用embedding和数据处理)

首先可以简单了解一下向量数据库相关知识&#xff1a; 向量数据库相关知识&#xff08;搬运学习&#xff0c;建议还是看原文&#xff0c;这个只是我自己的学习记录&#xff09;-CSDN博客 补充&#xff1a; 使用embedding API 文心千帆API Embedding-V1是基于百度文心大模型…

【STM32】GPIO复用和映射

1.什么叫管脚复用 STM32F4有很多的内置外设&#xff0c;这些外设的外部引脚都是与GPIO复用的。也就是说&#xff0c;一个GPIO如果可以复用为内置外设的功能引脚&#xff0c;那么当这个GPIO作为内置外设使用的时候&#xff0c;就叫做复用。 STM32F4系列微控制器IO引脚通过一个…

我使用 GPT-4o 帮我挑西瓜

在 5 月 15 日&#xff0c;OpenAI 旗下的大模型 GPT-4o 已经发布&#xff0c;那时网络上已经传开&#xff0c; 但很多小伙伴始终没有看到 GPT-4o 的体验选项。 在周五的时候&#xff0c;我组建的 ChatGPT 交流群的伙伴已经发现了 GPT-4o 这个选项了&#xff0c;是在没有充值升…