三木零 发表于 2022-3-21 15:27

关于scala编写flink存放数据到redis中

本帖最后由 三木零 于 2022-4-7 09:21 编辑

代码如下,是一个获取后台生成的数据,要求统计销售额度和订单数量
目的:把处理的数据存到redis中
但有个问题,在redisMapper中可以设置键和值
可我返回的是(sum, 1000, num, 1000)这样的数据
怎么在redis中存放成这样的形式
hashset: {
    sum: 1000
    num: 1000
}
============================代码部分========================================package com.sml

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

case class Test(sum: Double)

object Main {
    def main(args: Array): Unit = {
      // 获取一个系统的执行环境
      val env = StreamExecutionEnvironment.getExecutionEnvironment

      // 填入配置,后边用
      val properties = new Properties()
      properties.setProperty("bootstrap.servers", "192.168.23.49:9092")
      properties.setProperty("zookeeper.connect", "192.168.23.49:2181")
      properties.setProperty("group.id", "test")

      // kafka的配置
      val kafkaSource = new FlinkKafkaConsumer("order", new SimpleStringSchema, properties)

      // 定义一个存储用户数的变量
      var num = 0

      // 添加一个source来获取kafka中主题的数据
      val dataStream = env.addSource(kafkaSource)
          // filter,设置哪些数据要进行处理
          .filter(x => {
            x.split(":")(0).equals("O")
          })
          // 进行具体的处理
          .map(x => {
            num += 1
            Test(x.split(",")(3).toDouble)
          })

      // 连接redis的相关配置
      val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
          .setHost("master")
          .setPassword("123456")
          .build()

      // 添加一个sink把处理好的数据存入redis中
      dataStream.map(x => {
            ("sum", x.sum)
      })
          .keyBy(_._1)
          .sum(1)
          .map(x => {
            println((x._1, x._2.toString))
            (x._1, x._2.toString)
          })

      // 把数据存入到redis中
      dataStream.addSink(new RedisSink[(String, String)](config, new RedisExampleMapper))

      // 执行配置好的环境
      env.execute()
    }
}

class RedisExampleMapper extends RedisMapper[(String, String)] {
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, "order")
    }

    override def getKeyFromData(t: (String, String)): String = t._1

    override def getValueFromData(t: (String, String)): String = t._2
}

勉旃 发表于 2022-3-21 19:26

转成元组按下标取值后组装,以hash类型插入redis

三木零 发表于 2022-3-21 20:14

勉旃 发表于 2022-3-21 19:26
转成元组按下标取值后组装,以hash类型插入redis

但是他的Mapper不是只有getKeyFromData和getValueFromData吗,他是怎么去存两双键值对呢

三木零 发表于 2022-3-22 09:02

已经解决了,只需要在程序中多加几个.map().addSink(),处理出不同的数据再存到redis中就可以了
页: [1]
查看完整版本: 关于scala编写flink存放数据到redis中