吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1301|回复: 3
收起左侧

[已解决] 关于scala编写flink存放数据到redis中

[复制链接]
三木零 发表于 2022-3-21 15:27
本帖最后由 三木零 于 2022-4-7 09:21 编辑

代码如下,是一个获取后台生成的数据,要求统计销售额度和订单数量
目的:把处理的数据存到redis中
但有个问题,在redisMapper中可以设置键和值
可我返回的是(sum, 1000, num, 1000)这样的数据
怎么在redis中存放成这样的形式
[Asm] 纯文本查看 复制代码
hashset: {
    sum: 1000
    num: 1000
}

============================代码部分========================================
[Scala] 纯文本查看 复制代码
package com.sml

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

case class Test(sum: Double)

object Main {
    def main(args: Array[String]): Unit = {
        // 获取一个系统的执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        // 填入配置,后边用
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "192.168.23.49:9092")
        properties.setProperty("zookeeper.connect", "192.168.23.49:2181")
        properties.setProperty("group.id", "test")

        // kafka的配置
        val kafkaSource = new FlinkKafkaConsumer("order", new SimpleStringSchema, properties)

        // 定义一个存储用户数的变量
        var num = 0

        // 添加一个source来获取kafka中主题的数据
        val dataStream = env.addSource(kafkaSource)
          // filter,设置哪些数据要进行处理
          .filter(x => {
              x.split(":")(0).equals("O")
          })
          // 进行具体的处理
          .map(x => {
              num += 1
              Test(x.split(",")(3).toDouble)
          })

        // 连接redis的相关配置
        val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
          .setHost("master")
          .setPassword("123456")
          .build()

        // 添加一个sink把处理好的数据存入redis中
        dataStream.map(x => {
            ("sum", x.sum)
        })
          .keyBy(_._1)
          .sum(1)
          .map(x => {
              println((x._1, x._2.toString))
              (x._1, x._2.toString)
          })

        // 把数据存入到redis中
        dataStream.addSink(new RedisSink[(String, String)](config, new RedisExampleMapper))

        // 执行配置好的环境
        env.execute()
    }
}

class RedisExampleMapper extends RedisMapper[(String, String)] {
    override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET, "order")
    }

    override def getKeyFromData(t: (String, String)): String = t._1

    override def getValueFromData(t: (String, String)): String = t._2
}

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

勉旃 发表于 2022-3-21 19:26
转成元组按下标取值后组装,以hash类型插入redis
 楼主| 三木零 发表于 2022-3-21 20:14
勉旃 发表于 2022-3-21 19:26
转成元组按下标取值后组装,以hash类型插入redis

但是他的Mapper不是只有getKeyFromData和getValueFromData吗,他是怎么去存两双键值对呢
 楼主| 三木零 发表于 2022-3-22 09:02
已经解决了,只需要在程序中多加几个.map().addSink(),处理出不同的数据再存到redis中就可以了
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-25 15:45

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表