本帖最后由 三木零 于 2022-4-7 09:21 编辑
代码如下,是一个获取后台生成的数据,要求统计销售额度和订单数量
目的:把处理的数据存到redis中
但有个问题,在redisMapper中可以设置键和值
可我返回的是(sum, 1000, num, 1000)这样的数据
怎么在redis中存放成这样的形式
[Asm] 纯文本查看 复制代码 hashset: {
sum: 1000
num: 1000
}
============================代码部分========================================[Scala] 纯文本查看 复制代码 package com.sml
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
case class Test(sum: Double)
object Main {
def main(args: Array[String]): Unit = {
// 获取一个系统的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 填入配置,后边用
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.23.49:9092")
properties.setProperty("zookeeper.connect", "192.168.23.49:2181")
properties.setProperty("group.id", "test")
// kafka的配置
val kafkaSource = new FlinkKafkaConsumer("order", new SimpleStringSchema, properties)
// 定义一个存储用户数的变量
var num = 0
// 添加一个source来获取kafka中主题的数据
val dataStream = env.addSource(kafkaSource)
// filter,设置哪些数据要进行处理
.filter(x => {
x.split(":")(0).equals("O")
})
// 进行具体的处理
.map(x => {
num += 1
Test(x.split(",")(3).toDouble)
})
// 连接redis的相关配置
val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("master")
.setPassword("123456")
.build()
// 添加一个sink把处理好的数据存入redis中
dataStream.map(x => {
("sum", x.sum)
})
.keyBy(_._1)
.sum(1)
.map(x => {
println((x._1, x._2.toString))
(x._1, x._2.toString)
})
// 把数据存入到redis中
dataStream.addSink(new RedisSink[(String, String)](config, new RedisExampleMapper))
// 执行配置好的环境
env.execute()
}
}
class RedisExampleMapper extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "order")
}
override def getKeyFromData(t: (String, String)): String = t._1
override def getValueFromData(t: (String, String)): String = t._2
}
|