百木园-与人分享,
就是让自己快乐。

Flink-出租车-基于 DataStream API 计算每小时赚取最多小费的司机

案例来源 https://github.com/apache/flink-training/blob/release-1.14/hourly-tips/README_zh.md

案例介绍

基于出租车付费事件流计算出每小时赚取最多小费的司机,最简单的方法是通过两个步骤来解决这个问题:首先使用一个小时长的窗口来计算每个司机在一小时内的总小费,然后从该窗口结果流中找到每小时总小费最多的司机。

结果输出:
每小时产生一个 HourlyTip对象 记录的数据流。 这个记录应包含该小时结束时的时间戳、 该小时内获得小费最多的司机的 driverId 以及他的实际小费总数。

public class HourlyTip {

    /**
     * 小时结束时的时间戳
     */
    private Long eventTime;

    /**
     * 司机id driverId
     */
    private Long driverId;

    /**
     * 该小时获得的小费总数
     */
    private Float tips;

}

核心代码

       // 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 定义出租车-车费数据源
        KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
                .setBootstrapServers(\"192.168.0.192:9092\")
                .setTopics(\"TOPIC_FARE\")
                .setGroupId(\"TEST_GROUP\")
                .setClientIdPrefix(\"fare\") // 避免kafka clientId重复
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiFareDeserialization())
                .build();

        DataStreamSource<TaxiFare> fareStream = env.fromSource(fareSource, WatermarkStrategy.<TaxiFare>forMonotonousTimestamps().withTimestampAssigner((fare, t) -> fare.getStartTime()), \"fare source\");

        // 按司机分组,对每小时内的数据进行统计,求出每个司机每小时的总小费
        SingleOutputStreamOperator<HourlyTip> hourlyTipsStream = fareStream.keyBy(TaxiFare::getDriverId)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .process(new AddTipsFunction());


        /**
         *  window和windowAll的区别
         *
         *  keyBy后数据分流,window是把不同的key分开聚合成窗口
         *      而windowAll是把所有的key都聚合起来,所以windowAll的并行度只能为1,而window可以有多个并行度
         *
         */


        // 把所有key汇总起来,找出每个小时总小费最多的司机
        SingleOutputStreamOperator<HourlyTip> hourlyMaxStream = hourlyTipsStream.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).max(\"tips\");


        hourlyMaxStream.addSink(new PrintSinkFunction<>());

        env.execute(\"Hourly Tips\");

完整代码

https://github.com/Mr-LuXiaoHua/study-flink

代码入口  com.example.datastream.hourlytips.HourlyTipsJob

来源:https://www.cnblogs.com/luxh/p/16434821.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » Flink-出租车-基于 DataStream API 计算每小时赚取最多小费的司机

相关推荐

  • 暂无文章