大数剧-flink-高级编程A
- flink开发source、operator、sink、状态与容错
一、flink开发source、operator、sink
1. 计算模型
2.DataSource
输入Controlling Latency (控制延迟)
默认情况下,流中的元素并不会一个一个的在网络中传输(这会导致不必要的网络流量消耗) ,而是缓存起来,缓存的大小可以在Flink的配置文件、 ExecutionEnvironment、在某个算子上进行配置(默认100ms)
· 好处:提高吞吐
· 坏处:增加了延迟
· 如何把握平衡
- 为了最大吞吐量,可以设置setBufferTimeout(-1),这会移除timeout机制,缓存中的数据一满就会被发送
- 为了最小的延迟,可以将超时设置为接近0的数(例如5或者10ms)
- 缓存的超时不要设置为0,因为设置为0会带来一些性能的损耗
内置数据源
基于文件
env**.**readTextFile**(**"file:///path"**)** env**.**readFile**(**inputFormat**,** "file:///path"**);**
基于Socket
env**.**socketTextStream**(**"localhost"**,** 6666**,** '\n'**)**
基于Collection
env.socketTextStream("localhost", 6666, '\n')import org.apache.flink.api.scala._ env.fromCollection(List(1,2,3)) env.fromElements(1,2,3) env.generateSequence(0, 1000) #不需要隐式转换
自定义数据源
- 实现SourceFunction(非并行的)
示例代码:
function:
package cn.zhanghub.source;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class FileCountryDictSourceFunction implements SourceFunction<String> {
private String md5 = null;
private Boolean isCancel = true;
private Integer interval = 10000;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.get(hadoopConf);
while (isCancel) {
if(!fs.exists(pathString)){
Thread.sleep(interval);
continue;
}
FileChecksum fileChecksum = fs.getFileChecksum(pathString);
String md5Str = fileChecksum.toString();
String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
if (!currentMd5.equals(md5)) {
FSDataInputStream open = fs.open(pathString);
BufferedReader reader = new BufferedReader(new InputStreamReader(open));
String line = reader.readLine();
while (line != null) {
ctx.collect(line);
line = reader.readLine();
}
reader.close();
md5 = currentMd5;
}
Thread.sleep(interval);
}
}
@Override
public void cancel() {
isCancel = false;
}
}
运行时:
package cn.zhanghub.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FileSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceFunction());
stringDataStreamSource.print();
env.execute();
}
}
- 实现ParallelSourceFunction与RichParallelSourceFunction(并行的)
以Kafka-connector-source为代表
· 基于Kafka 的partition 机制,Flink实现了并行化数据切分
· Flink 可以消费Kafka的topic,和sink数据到Kafka
· 出现失败时,flink通过checkpoint机制来协调Kafka来恢复应用(通过设置kafka的offset)
引入依赖:
flink支持的kafka版本对比:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.9.3</version>
<scope>compile</scope>
</dependency>
flink支持的kafka版本对比:
Maven Dependency | 支持自 | Class name | Kafka版本 | 说明 |
---|---|---|---|---|
flink-connector-kafka- 0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x | 内部使用kakfa的 SimpleConsumer API 。 Flink把Offset提交给Zookeeper |
flink-connector-kafka- 0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 使用kafka的new Consumer API Kafka. |
flink-connector-kafka- 0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 生产和消费支持 Kafka messages with timestamps |
1).Flink KafkaConsumer 的 ** **Source API
1.FlinkKafkaConsumer010创建方式:
FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props)
三个构造参数:
要消费的topic(topic name / topic names/正表达式)
DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的数据))
Kafka consumer的属性,其中三个属性必须提供:
- bootstrap.servers(逗号分隔的Kafka broker列表)
- zookeeper.connect(逗号分隔的Zookeeper server列表,仅Kafka 0.8需要))
- group.id(consumer group id)
2.反序列化Schema类型
作用:对kafka里获取的二进制数据进行反序列化
FlinkKafkaConsumer需要知道如何将Kafka中的二进制数据转换成Java/Scala对象,DeserializationSchema定义了该转换模式,通过T deserialize(byte[] message)
FlinkKafkaConsumer从kafka获取的每条消息都会通过DeserializationSchema的T deserialize(byte[] message)反序列化处理
反序列化Schema类型(接口):
- DeserializationSchema(只反序列化value)
- KeyedDeserializationSchema
3.常见反序列化Schema
- SimpleStringSchema
- JSONDeserializationSchema / JSONKeyValueDeserializationSchema
- TypeInformationSerializationSchema / TypeInformationKeyValueSerializationSchema
- AvroDeserializationSchema
4.自定义反序列化Schema:
- 实现DeserializationSchema与KeyedDeserializationSchema接口
DeserializationSchema:
KeyedDeserializationSchema:
bean:
package cn.zhanghub.source;
public class HainiuKafkaRecord {
private String record;
public HainiuKafkaRecord(String record) {
this.record = record;
}
public String getRecord() {
return record;
}
public void setRecord(String record) {
this.record = record;
}
}
schema:
package cn.zhanghub.source;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class HainiuKafkaRecordSchema implements DeserializationSchema<HainiuKafkaRecord> {
@Override
public HainiuKafkaRecord deserialize(byte[] message) throws IOException {
HainiuKafkaRecord hainiuKafkaRecord = new HainiuKafkaRecord(new String(message));
return hainiuKafkaRecord;
}
@Override
public boolean isEndOfStream(HainiuKafkaRecord nextElement) {
return false;
}
@Override
public TypeInformation<HainiuKafkaRecord> getProducedType() {
return TypeInformation.of(HainiuKafkaRecord.class);
}
}
5.FlinkKafkaConsumer010最简样版代码
6.FlinkKafkaConsumer消费
消费模式 | 说明 | |
---|---|---|
setStartFromEarliest | 从队头开始,最早的记录 | 内部的Consumer提交到Kafka/zk中的偏移量将被忽略 |
setStartFromLatest | 从队尾开始,最新的记录 | |
setStartFromGroupOffsets() | 默认值,从当前消费组记录的偏移量开始,接着上次的偏移量消费 | 以Consumer提交到Kafka/zk中的偏移量最为起始位置开始消费, group.id设置在consumer的properties里; 如果没找到记录的偏移量,则使用consumer的properties的 auto.offset.reset设置的策略 |
setStartFromSpecificOffsets(Map<TopicPa rtition, Long>的参数) | 从指定的具体位置开始消费 | |
setStartFromTimestamp(long) | 从指定的时间戳开始消费 | 对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位 置。如果一个分区的最新记录早于时间戳,那么只需要从最新记录 中读取该分区。在此模式下,Kafka/zk中提交的偏移量将被忽略 |
注意
- kafka 0.8版本, consumer提交偏移量到zookeeper,后续版本提交到kafka(一个特殊的topic: __consumer_offsets)
7.动态Partition discovery
- Flink Kafka Consumer支持动态发现Kafka分区,且能保证exactly-once
- 默认禁止动态发现分区,把flink.partition-discovery.interval-millis设置大于0即可启用:
properties.setProperty(“flink.partition-discovery.interval-millis”, “30000”)
8.动态Topic discovery
- Flink Kafka Consumer支持动态发现Kafka Topic,仅限通过正则表达式指定topic的方式
- 默认禁止动态发现topic,把flink.partition-discovery.interval-millis设置大于0即可启用
示例代码:
package cn.zhanghub.source;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaRichParallelSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new SimpleStringSchema(), kafkaConsumerProps);
// FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>(Pattern.compile("flink_event_[0-9]"), new SimpleStringSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<String> kafkaInput = env.addSource(kafkaSource);
kafkaInput.print();
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaBeanSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
DataStreamSource<HainiuKafkaRecord> kafkaBeanInput = env.addSource(kafkaBeanSource);
kafkaBeanInput.print();
env.execute();
}
}
3.transformations
下图展示了 Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系。
DataStream
DataStream 是 Flink 流处理 API 中最核心的数据结构。它代表了一个运行在多个分区上的并行流。一个 DataStream 可以从 StreamExecutionEnvironment 通过env.addSource(SourceFunction) 获得。
DataStream 上的转换操作都是逐条的,比如 map(),flatMap(),filter()
自定义转换函数
1.函数
scala函数
data.flatMap(f => f.split(“ “))
java的lambda
data.flatMap(f -> f.split(“ “));
2.实现接口
text.flatMap(new FlatMapFunction[String,String] {
override def flatMap(value: String, out: Collector[String]) = {
val strings: Array[String] = value.split(" ")
for(s <- strings){
out.collect(s)
}
}
})data.flatMap(f -> f.split(" "));
3.Rich Functions
Rich Function中有非常有用的四个方法:open,close,getRuntimeContext和setRuntimecontext 这些功能在创建本地状态、获取广播变量、获取运行时信息(例如累加器和计数器)和迭代信息时非常有帮助。
示例代码:
import java.util.Properties
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord}
class HainiuRichFlatMapFunction(topic:String,props:Properties) extends RichFlatMapFunction[String,Int]{
var producer:Producer[String,String] = _
override def open(parameters: Configuration): Unit = {
//创建kafka生产者
producer = new KafkaProducer[String,String](props)
}
override def close(): Unit = {
//关闭kafka生产者
producer.close()
}
override def flatMap(value: String, out: Collector[Int]): Unit = {
//使用RuntimeContext得到子线程ID,比如可以用于多线程写文件
println(getRuntimeContext.getIndexOfThisSubtask)
//发送数据到kafka
producer.send(new ProducerRecord[String,String](topic,value))
}
}
operators
1.connect 与 union (合并流)
- connect之后生成ConnectedStreams,会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态(比如计数)。这在第一个流的输入会影响第二个流时, 会非常有用。union 合并多个流,新的流包含所有流的数据。
- union是DataStream → DataStream
- connect只能连接两个流,而union可以连接多于两个流
- connect连接的两个流类型可以不一致,而union连接的流的类型必须一致
示例代码:
union:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeUnion {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new SimpleStringSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<String> kafkainput = env.addSource(kafkaSource);
DataStream<String> union = countryDictSource.union(kafkainput);
SingleOutputStreamOperator<String> process = union.process(new ProcessFunction<String, String>() {
private Map<String, String> map = new HashMap<>();
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
String[] split = value.split("\t");
if (split.length > 1) {
map.put(split[0], split[1]);
out.collect(value);
} else {
String countryName = map.get(value);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
}
});
process.print();
env.execute();
}
}
connect:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
ConnectedStreams<String, HainiuKafkaRecord> connect = countryDictSource.connect(kafkainput);
SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<String, HainiuKafkaRecord, String>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
String[] split = value.split("\t");
map.put(split[0], split[1]);
out.collect(value);
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
String countryCode = value.getRecord();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
});
connectInput.print();
env.execute();
}
}
2.keyBy
含义: 根据指定的key进行分组(逻辑上把DataStream分成若干不相交的分区,key一样的event会 被划分到相同的partition,内部采用类似于hash分区来实现)
转换关系: DataStream → KeyedStream
使用场景: 分组(类比SQL中的分组)比如join,coGroup,keyBy,groupBy,Reduce,GroupReduce,Aggregate,Windows
KeyedStream
- KeyedStream用来表示根据指定的key进行分组的数据流。
- 一个KeyedStream可以通过调用DataStream.keyBy()来获得。
- 在KeyedStream上进行任何transformation都将转变回DataStream。
- 在实现中,KeyedStream会把key的信息传入到算子的函数中。
示例代码:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnectKeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
}).keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
@Override
public String getKey(HainiuKafkaRecord value) throws Exception {
return value.getRecord();
}
});
ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, String>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(ctx.getCurrentKey(), value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
String countryCode = ctx.getCurrentKey();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
});
connectInput.print();
env.execute();
}
}
Key的类型限制:
- 不能是没有覆盖hashCode方法的POJO(也就是bean)
- 不能是数组
POJO:
package cn.zhanghub.source;
public class HainiuKafkaRecord {
private String record;
public HainiuKafkaRecord(String record) {
this.record = record;
}
public String getRecord() {
return record;
}
public void setRecord(String record) {
this.record = record;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((record == null) ? 0 : record.hashCode());
return result;
}
}
示例代码:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnectKeyByObject {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
KeyedStream<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<HainiuKafkaRecord, String>>() {
@Override
public Tuple2<HainiuKafkaRecord, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(new HainiuKafkaRecord(new String(split[0])), split[1]);
}
}).keyBy(new KeySelector<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord>() {
@Override
public HainiuKafkaRecord getKey(Tuple2<HainiuKafkaRecord, String> value) throws Exception {
return value.f0;
}
});
KeyedStream<HainiuKafkaRecord, HainiuKafkaRecord> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, HainiuKafkaRecord>() {
@Override
public HainiuKafkaRecord getKey(HainiuKafkaRecord value) throws Exception {
return value;
}
});
ConnectedStreams<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<HainiuKafkaRecord, Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord, String>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<HainiuKafkaRecord, String> value, Context ctx, Collector<String> out) throws Exception {
String currentKey = ctx.getCurrentKey().getRecord();
map.put(currentKey, value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
HainiuKafkaRecord currentKey = ctx.getCurrentKey();
String countryName = map.get(currentKey.getRecord());
String outStr = countryName == null ? "no match" : countryName;
out.collect(currentKey.toString() + "--" + outStr);
}
});
connectInput.print();
env.execute();
}
}
*可能会出现数据倾斜,可根据实际情况结合物理分区来解决
3.物理分区
算子间数据传递模式
- One-to-one streams 保持元素的分区和顺序
- Redistributing streams
改变流的分区策略取决于使用的算子
- keyBy()(re-partitions by hashing the key)
- broadcast()
- rebalance()(which re-partitions randomly)
都是Transformation,都可以改变分区
分区Transformation | 说明 |
---|---|
Random partitioning | 按均匀分布随机划分元素,网络开销往往比较大 dataStream.shuffle() |
Round-robin partitioning | 循环对元素进行分区,为每一个分区创建相等的负载,这在数据倾斜时非常有用的: dataStream.rebalance() |
Rescaling | 跟rebalance有点类似,但不是全局的,通过轮询调度将元素从上游的task一个子 集发送到下游task的一个子集: dataStream.rescale(); |
Broadcasting | 将元素广播到每个分区上 dataStream.broadcast(); |
Custom partitioning | dataStream.partitionCustom(partitioner, “someKey”) 或 dataStream.partitionCustom(partitioner, 0) |
4.解决数据倾斜
1).One-to-one streams 解决数据倾斜的方法:
rebalance
- 含义:再平衡,用来减轻数据倾斜
- 转换关系: DataStream → DataStream
- 使用场景:处理数据倾斜,比如某个kafka的partition的数据比较多
示例代码:
val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = str1.rebalance()
val str3: DataStream[AnotherType] = str2.map { ... }
上述 DataStream 上的转换在运行时会转换成如下的执行图:
如上图的执行图所示,DataStream 各个算子会并行运行,算子之间是数据流分区。如 Source 的第一个并行实例(S1)和 flatMap() 的第一个并行实例(m1)之间就是一个数据流分区。而在 flatMap() 和 map() 之间由于加了 rebalance(),它们之间的数据流分区就有3个子分区(m1的数据流向3个map()实例)。
rescale
- 原理:通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
- 转换关系:DataStream → DataStream
- 使用场景:数据传输都在一个TaskManager内,不需要通过网络。
原理:
第一个task并行度为2,第二个task并行度为6,第三个task并行度为2。从第一个task到第二个task,Src的 子集Src1 和 Map的子集Map1,2,3对应起来,Src1会以轮询调度的方式分别向Map1,2,3发送记录。 从第二个task到第三个task,Map的子集1,2,3对应Sink的子集1,这三个流的元素只会发送到Sink1。 假设我们每个TaskManager有三个Slot,并且我们开了SlotSharingGroup,那么通过rescale,所有的数据传输都在一个TaskManager内,不需要通过网络。
2).Redistributing streams 解决数据倾斜的方法:
自定义partitioner
转换关系:DataStream → DataStream
使用场景:自定义数据处理负载
实现方法:
- 实现org.apache.flink.api.common.functions.Partitioner接口
- 覆盖partition方法
- 设计算法返回partitionId
示例代码:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnectCustomPartitioner {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
DataStream<Tuple2<String, String>> countryDictPartition = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
}).partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
if (key.contains("CN")) {
return 0;
} else {
return 1;
}
}
}, new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
DataStream<HainiuKafkaRecord> recordPartition = kafkainput.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
if (key.contains("CN")) {
return 0;
} else {
return 1;
}
}
}, new KeySelector<HainiuKafkaRecord, String>() {
@Override
public String getKey(HainiuKafkaRecord value) throws Exception {
return value.getRecord();
}
});
ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictPartition.connect(recordPartition);
SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<Tuple2<String, String>, HainiuKafkaRecord, String>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(value.f0, value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
String countryCode = value.getRecord();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
});
connectInput.print();
env.execute();
}
}
使用parititoner解决数据倾斜
package cn.zhanghub.flink.operator;
import cn.zhanghub.flink.source.FileCountryDictSourceFunction;
import cn.zhanghub.flink.source.HainiuKafkaRecord;
import cn.zhanghub.flink.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
public class CountryCodeConnectCustomPartitioner {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkaInput = env.addSource(kafkaSource);
DataStream<HainiuKafkaRecord> kafka = kafkaInput.map(new MapFunction<HainiuKafkaRecord, HainiuKafkaRecord>() {
@Override
public HainiuKafkaRecord map(HainiuKafkaRecord value) throws Exception {
String record = value.getRecord();
Random random = new Random();
int i = random.nextInt(10);
return new HainiuKafkaRecord(i + "_" + record);
}
}).partitionCustom(new Partitioner<HainiuKafkaRecord>() {
@Override
public int partition(HainiuKafkaRecord key, int numPartitions) {
String[] s = key.getRecord().split("_");
String randomId = s[0];
return new Integer(randomId);
}
},
new KeySelector<HainiuKafkaRecord, HainiuKafkaRecord>() {
@Override
public HainiuKafkaRecord getKey(HainiuKafkaRecord value) throws Exception {
return value;
}
});
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
DataStream<Tuple2<HainiuKafkaRecord, String>> countryDict = countryDictSource.flatMap(new FlatMapFunction<String, Tuple2<HainiuKafkaRecord, String>>() {
@Override
public void flatMap(String value, Collector<Tuple2<HainiuKafkaRecord, String>> out) throws Exception {
String[] split = value.split("\t");
String key = split[0];
String values = split[1];
for (int i = 0; i < 10; i++) {
String randomKey = i + "_" + key;
Tuple2<HainiuKafkaRecord, String> t2 = Tuple2.of(new HainiuKafkaRecord(randomKey), values);
out.collect(t2);
}
}
}).partitionCustom(new Partitioner<HainiuKafkaRecord>() {
@Override
public int partition(HainiuKafkaRecord key, int numPartitions) {
String[] s = key.getRecord().split("_");
String randomId = s[0];
return new Integer(randomId);
}
}, new KeySelector<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord>() {
@Override
public HainiuKafkaRecord getKey(Tuple2<HainiuKafkaRecord, String> value) throws Exception {
return value.f0;
}
});
ConnectedStreams<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord> connect = countryDict.connect(kafka);
SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<Tuple2<HainiuKafkaRecord, String>, HainiuKafkaRecord, String>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<HainiuKafkaRecord, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(value.f0.getRecord(), value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
String countryName = map.get(value.getRecord());
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
});
connectInput.print();
env.execute();
}
}
5.reduce 与 fold
- 分组之后当然要对分组之后的数据也就是KeyedStream进行各种聚合操作啦
- KeyedStream → DataStream
- 对于KeyedStream的聚合操作都是滚动的(rolling,在前面的状态基础上继续聚合),千万不要理解为批处理时的聚合操作(DataSet,其实也是滚动聚合,只不过他只把最后的结果给了我们)
聚合操作 | 意义 |
---|---|
reduce | KeyedStream流上,将上一次reduce的结果和本次的进行操作 |
fold | 对keyedStream流上的event进行连接操作 |
sum/min/minBy/max/maxBy | reduce的特例,min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy) |
process | 底层的聚合操作 |
示例代码:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnectKeyByCountryCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
}).keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
@Override
public String getKey(HainiuKafkaRecord value) throws Exception {
return value.getRecord();
}
});
ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
SingleOutputStreamOperator<Tuple2<String, Integer>> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, Tuple2<String, Integer>>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
map.put(ctx.getCurrentKey(), value.f1);
out.collect(Tuple2.of(value.f0, 1));
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String countryCode = ctx.getCurrentKey();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(Tuple2.of(countryName.substring(0, countryName.indexOf(" ")), 1));
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = connectInput.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
reduce.print();
env.execute();
}
}
6.OutputTab(拆分流)
- 只能在processFunction中使用
- 根据条件输出不同类型的数据
示例代码:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnectKeyByCountryCountOutputTag {
private static final OutputTag<String> ot = new OutputTag<String>("china") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
}).keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
@Override
public String getKey(HainiuKafkaRecord value) throws Exception {
return value.getRecord();
}
});
ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
SingleOutputStreamOperator<Tuple2<String, Integer>> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, Tuple2<String, Integer>>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
map.put(ctx.getCurrentKey(), value.f1);
out.collect(Tuple2.of(value.f0, 1));
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String countryCode = ctx.getCurrentKey();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
if (outStr.contains("中国")) {
ctx.output(ot, outStr);
}
out.collect(Tuple2.of(countryName.substring(0, countryName.indexOf(" ")), 1));
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = connectInput.keyBy(1).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
reduce.print();
connectInput.getSideOutput(ot).print();
env.execute();
}
}
4.sink
内置数据输出
基于文件
#使用TextOutputFormat stream.writeAsText("/path/to/file") #使用CsvOutputFormat stream.writeAsCsv("/path/to/file")
基于Socket
stream.writeToSocket(host, port, SerializationSchema)
基于标准/错误输出
stream.writeToSocket(host, port, SerializationSchema)#注: 线上应用杜绝使用,采用抽样打印或者日志的方式 stream.print() stream.printToErr()
自定义数据输出
- 实现SinkFunction 或 继承RichSinkFunction(在没有自行改变并行度的情况下,是否并行取决其父operator)
1.实现RichSinkFunction
- 实现写入文件写入文件到HDFS
示例代码:
function:
package cn.zhanghub.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.text.SimpleDateFormat;
import java.util.Date;
public class HDFSSinkFunction extends RichSinkFunction<String> {
private FileSystem fs = null;
private SimpleDateFormat sf = null;
private String pathStr = null;
@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
fs = FileSystem.get(conf);
sf = new SimpleDateFormat("yyyyMMddHH");
pathStr = "hdfs://ns1/user/qingniu/flinkstreaminghdfs";
}
@Override
public void close() throws Exception {
fs.close();
}
@Override
public void invoke(String value, Context context) throws Exception {
if (null != value) {
String format = sf.format(new Date());
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
StringBuilder sb = new StringBuilder();
sb.append(pathStr).append("/").append(indexOfThisSubtask).append("_").append(format);
Path path = new Path(sb.toString());
FSDataOutputStream fsd = null;
if (fs.exists(path)) {
fsd = fs.append(path);
} else {
fsd = fs.create(path);
}
fsd.write((value + "\n").getBytes("UTF-8"));
fsd.close();
}
}
}
运行类:
package cn.zhanghub.sink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class HDFSFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 6666);
source.addSink(new HDFSSinkFunction());
env.execute();
}
}
2.以Kafka-connector-sink
1.FlinkFlinkKafkaProducer创建方式:
FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema)
FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig)
FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema)
FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig)
FlinkKafkaProducer010(String topicId,SerializationSchema<T> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<T> customPartitioner)
FlinkKafkaProducer010(String topicId,KeyedSerializationSchema<T> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<T> customPartitioner)
2.常见序列化Schema
· TypeInformationKeyValueSerializationSchema
· SimpleStringSchema
4.自定义序列化Schema:
- 实现KeyedSerializationSchema接口
示例代码:
case class KafkaEventP(message: String, eventTime: Long)
//自定义Serializer用来进行对象序列化到kafka中
class KafkaEventPKeyedSerializationSchema extends KeyedSerializationSchema[KafkaEventP] {
//序列化到kafka的key
override def serializeKey(element: KafkaEventP): Array[Byte] = {
element.message.getBytes()
}
//序列化到kafka的value
override def serializeValue(element: KafkaEventP): Array[Byte] = {
s"hainiu_processed_${element.message}".getBytes()
}
//得到目标topic可以不指定,因为在创建sink的时候已经指定
override def getTargetTopic(element: KafkaEventP): String = {
null
}
}
4.producerConfig
FlinkKafkaProducer内部KafkaProducer的配置
https://kafka.apache.org/documentation.html
示例代码:
Properties producerPropsSns = new Properties();
producerPropsSns.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
producerPropsSns.setProperty("retries", "3");
5.FlinkKafkaPartitioner
- 默认使用FlinkFixedPartitioner,即每个subtask的数据写到同一个Kafka partition中
- 自定义分区器:继承FlinkKafkaPartitioner
示例代码:
Partitioner:
package cn.zhanghub.sink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
public class HainiuFlinkPartitioner extends FlinkKafkaPartitioner {
@Override
public int partition(Object record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return 1;
}
}
运行类:
package cn.zhanghub.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import java.util.Properties;
public class KafkaRichParallelSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 6666);
Properties producerPropsSns = new Properties();
producerPropsSns.setProperty("bootstrap.servers", "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
producerPropsSns.setProperty("retries", "3");
//FlinkKafkaProducer010类的构造函数支持自定义kafka的partitioner,
FlinkKafkaProducer010 kafkaOut = new FlinkKafkaProducer010<String>("flink_event_result",
new SimpleStringSchema(),
producerPropsSns,new HainiuFlinkPartitioner());
source.addSink(kafkaOut);
env.execute();
}
}
二、状态与容错
1.Flink恢复机制
1. 通过配置重生策略进行容错
· Flink支持不同的重启策略,这些策略控制在出现故障时如何重新启动job
Restart Strategy | 配置项 | 默认值 | 说明 |
---|---|---|---|
固定延迟(Fixed delay) | restart-strategy:fixed-delay | 如果超过最大尝试次数,作业最终会失败。在连续两次重启尝试之间等待固定的时间。 | |
restart-strategy.fixed-delay.attempts:3 | 1或者Integer.MAX_VALUE(启用checkpoint但未指定重启策略时) | ||
restart-strategy.fixed-delay.delay:10s | akka.ask.timeout或者10s(启用checkpoint但未指定重启策略时) | ||
失败率(Failure rate) | restart-strategy:failure-rate | 在失败后重新启动作业,但是当超过故障率(每个时间间隔的故障)时,作业最终会失败。在连续两次重启尝试之间等待固定的时间。 | |
restart-strategy:failure-rate.max-failures-per-interval:3 | 1 | ||
restart-strategy.failure-rate.failure-rateinterval:5min | 1 minute | ||
restart-strategy:failure-rate.delay:10s | akka.ask.timeout | ||
不恢复(No restart) | restart-strategy:none | 如果没有启用checkpointing,则使用无重启(no restart)策略。 |
· 重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置
固定延迟的代码
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3,
Time.of(0,TimeUnit.SECONDS)
)
)
示例代码:
package cn.zhanghub.state;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class FileSourceRestart {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceFunction());
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
System.out.println(value);
if(value.contains("中国")){
int a = 1/0;
}
return value;
}
}).print();
env.execute();
}
}
通过以上配置可以给你的程序增加生命条数,但是有个问题?能不能不仅增加生命条件,还能帮我存档?通过checkpoint加上state进行数据存档
2.Flink的state
1.什么是状态(State)
· Flink中的状态:一般指一个具体的task/operator某时刻在内存中的状态(例如某属性的值)
2.状态的作用
增量计算
- 聚合操作
- 机器学习训练模式
- 等等
容错
- Job故障重启
- 升级
3.没有状态的日子如何度过
Storm+Hbase,把这状态数据存放在Hbase中,计算的时候再次从Hbase读取状态数据,做更新在写入进去。这样就会有如下几个问题:
- 流计算任务和Hbase的数据存储有可能不在同一台机器上,导致性能会很差。这样经常会做远端的访问,走网络和存储
- 备份和恢复是比较困难,因为Hbase是没有回滚的,要做到Exactly onces很困难。在分布式环境下,如果程序出现故障,只能重启Storm,那么Hbase的数据也就无法回滚到之前的状态。比如广告计费的这种场景,Storm+Hbase是行不通的,出现的问题是钱可能就会多算,解决以上的办法是Storm+mysql,通过mysql的回滚解决一致性的问题。但是架构会变得非常复杂。性能也会很差,要commit确保数据的一致
- 对于storm而言状态数据的划分和动态扩容也是非常难做的,一个很严重的问题是所有用户都会strom上重复的做这些工作,比如搜索,广告都要在做一遍,由此限制了部门的业务发展
4.Flink有状态的计算
5.Flink丰富的状态访问和高效的容错机制
6.状态分类
- Operator State
- Keyed State
- 特殊的:Broadcast State(1.5开始)
1).Operator State
绑定到特定operator并行实例,每个operator的并行实例维护一个状态
与key无关
思考:一个并行度为3的source有几个状态(只考虑一个算子需要一个逻辑状态的情形)
支持的数据类型
- ListState
- ListState
例子:FlinkKafkaConsumer
- 每个Kafka Consumer实例都维护一个topic分区和偏移量的映射作为其操作状态。
2).Keyed State
- 基于KeyedStream之上的状态,dataStream.keyBy(),只能在作用于KeyedStrem上的function/Operator里使用
- KeyBy之后的Operator State,可理解为分区过的Operator State
- 每个并行keyed Operator的每个实例的每个key有一个Keyed State:即<parallel-operator-instance,key>就是一个唯一的状态,由于每个key属于一个keyed operator的并行实例,因此我们可以将其简单地理解为<operator,key>
- 思考:一个并行度为2的keyed Operator有多少个状态(只考虑一个算子需要一个逻辑状态的情形)
支持的数据结构
ValueState
:保留一个可以更新和检索的值 - update(T)
- value()
ListState
:保存一个元素列表 - add(T)
- addAll(List
) - get(T)
- clear()
ReducingState
:保存一个值,该值表示添加到该状态所有值的聚合。 - add(T)
AggregatingState<IN,OUT>:保存一个值,该值表示添加到该状态的所有值的聚合。(与ReducingState相反,聚合类型添加到该状态的元素可以有不同类型)
- add(T)
FoldingState<T,ACC>:不推荐使用
- add(T)
MapState<UK,UV>:保存一个映射列表
- put(UK,UV)
- putAll(Map<UK,UV>)
- get(UK)
3).注意:
- 状态不一定存储在内存,可能驻留在磁盘或其他地方
- 状态是使用RuntimeContext访问的,因此只能在Rich函数或process函数中访问
4).状态的表现形式
Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态。
managed(托管状态):
- 托管状态是指Flink框架管理的状态,如ValueState,ListState,MapState等。
- 通过框架提供的接口来更新和管理状态的值
- 不需要序列化
raw(原始状态)
- 原始状态是由用户自行管理的具体的数据结构,Flink在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知
- 需要序列化
通常在DataStream上的状态推荐使用托管的状态,当用户自定义operator时,会使用到原始状态。
大多数都是托管状态,除非自定义实现。
3.Flink的checkpoint
1).状态容错
- 有了状态自然需要状态容错,否则状态就失去意义了
- Flink状态容错的机制就是checkpoint
2).状态容错示意图
3).状态容错示意图(checkpoint)
4).状态容错示意图(Restore)
- 恢复所有状态
- 设置source的位置(例如:Kafka的offset)
5).Checkpointing是什么
概念
- 所谓checkpoint,就是在某一时刻,将所有task的状态做一个快照(snapshot),然后存储到State Backend
- 一种连续性绘制数据流状态的机制(周期性的),该机制确保即使出现故障,程序的状态最终也将为数据流中的每一条记录提供exactly once的语意保证(只能保证flink系统内,对于sink和source需要依赖的外部的组件一同保证)
- 全局快照,持久化保存所有的task / operator的State
- 序列化数据集合
- 注意:可以通过开关使用at least once语意保证
- 注意:Checkpoint是通过分布式snapshot实现的,没有特殊说明时snapshot和checkpoint和back-up是一个意思
- 注意:State和Checkpointing不要搞混
特点:
- 轻量级容错机制
- 可异步
- 全量 vs 增量
- 失败情况可回滚至最近一次成功的checkpoint(自动)
- 周期性(无需人工干预)
4.Checkpointing与State的使用
启用**Checkpointing**
1).如何开启Checkpointing
Checkpointing默认是禁用的
注意:迭代job目前不支持Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
//advanced options:
//set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoints have to complete within one minute,or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
//allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
### 2).Checkpointing高级选项之checkpointMode
· CheckpointingMode.EXACTLY_ONCE
· CheckpointingMode.AT_LEAST_ONCE
· 如何选择:一般情况下选择EXACTLY_ONCE,除非场景要求极低的延迟(几毫秒)
· 注意:要想整个EXACTLY_ONCE,source和sink也要同时保证EXACTLY_ONCE
//set mode to exactly-once (this is the default) env**.**getCheckpointConfig**().**setCheckpointingMode**(**CheckpointingMode**.**EXACTLY_ONCE**);**
### 3).Checkpointing高级选项之保留策略
- 默认情况下,检查点不被保留,仅用于从故障中恢复作业。可以启用外部持久化检查点,同时指定保留策略
- - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点。注意,在这种情况系,必须在取消后手动清理检查点状态。
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION当作业被cancel时,删除检查点。检查点状态仅在作业失败时可用。
```java
//enable externalized checkpoints which are retained after cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
4).Checkpointing其他高级选项
checkpointing的超时时间:超过时间没有完成则会被终止
//checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
checkpointing最小间隔:用于指定上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
//make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
maxConcurrentCheckpoints:指定运行中的checkpoint最多可以有多少个(设定checkpointing最小间隔时本参数即为1)
//allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
env.getCheckpointConfig().setFailOnCheckpointingErrors(**true**);
注意,当开启checkpointing对重启(no restart)策略的影响:
- 如果没有启用checkpointing,就是不恢复数据。
- 如果启用了checkpointing,但没有配置重启策略,则使用固定延迟(fixed-delay)策略,其中尝试重启次数是Integer>MAX_VALUE
1.使用Operator State方式1:实现CheckpointedFunction
Stateful function(RichFunction)实现CheckpointedFunction接口,必须实现两个方法:
- Void snapshotState(FunctionSnapshotContext context) throws Exception
Checkpoint执行时调用
一般用于原始状态与托管状态进行交换
- Void initializeState(FunctionlnitializationContext context) throws Exception;(初始化以及恢复逻辑)
Stateful function第一次初始化时调用
Stateful function从较早的checkpoint恢复时调用
示例代码:
checkpointFunction:
package cn.zhanghub.state;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;
public class FileCountryDictSourceOperatorStateCheckpointedFunction implements SourceFunction<String>,CheckpointedFunction {
private String md5 = null;
private ListState<String> ls = null;
private Boolean isCancel = true;
private Integer interval = 1000;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.get(hadoopConf);
while (isCancel) {
if(!fs.exists(pathString)){
Thread.sleep(interval);
continue;
}
System.out.println(md5);
FileChecksum fileChecksum = fs.getFileChecksum(pathString);
String md5Str = fileChecksum.toString();
String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
if (!currentMd5.equals(md5)) {
FSDataInputStream open = fs.open(pathString);
BufferedReader reader = new BufferedReader(new InputStreamReader(open));
String line = reader.readLine();
while (line != null) {
ctx.collect(line);
line = reader.readLine();
}
reader.close();
md5 = currentMd5;
}
Thread.sleep(interval);
}
}
@Override
public void cancel() {
isCancel = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
ls.clear();
ls.add(md5);
System.out.println("snapshotState");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> lsd = new ListStateDescriptor<String>("md5",String.class);
ls = context.getOperatorStateStore().getListState(lsd);
if (context.isRestored()){
Iterable<String> strings = ls.get();
String next = strings.iterator().next();
md5 = next;
}
}
}
运行程序:
package cn.zhanghub.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class FileSourceOperatorStateCheckpointed {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//保存EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//每次ck之间的间隔,不会重叠
checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
//每次ck的超时时间
checkpointConfig.setCheckpointTimeout(20000L);
//如果ck执行失败,程序是否停止
checkpointConfig.setFailOnCheckpointingErrors(true);
//job在执行CANCE的时候是否删除ck数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceOperatorStateCheckpointedFunction());
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if(value.contains("中国")){
int a = 1/0;
}
return value;
}
}).print();
env.execute();
}
}
使用Operator State方式
2:实现ListCheckpointed
(这个接口自己本身就带了一个ListState)
Stateful function(RichFunction)实现ListCheckpointed接口,只用ListState的重分配方式
必须实现两个方法
- List
snapshotState(long checkpointld,long timestamp) throws Exception;
- List
Checkpoint执行时调用
这个方法的返回值,会被当成一个listState ,util.List->listState
- void restoreState(List
state) throws Exception;
- void restoreState(List
这个方法的传入参数,实际上snapshotState返回的listState -> util.List,所以在这个方法面能直接得到listState恢复的数据。
Stateful function从较早的checkpoint恢复时调用
示例代码:
ListCheckpointed:
package cn.zhanghub.state;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
public class FileCountryDictSourceOperatorStateListCheckpointedFunction implements SourceFunction<String>, ListCheckpointed<String> {
private String md5 = null;
private Boolean isCancel = true;
private Integer interval = 1000;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.get(hadoopConf);
while (isCancel) {
if (!fs.exists(pathString)) {
Thread.sleep(interval);
continue;
}
System.out.println(md5);
FileChecksum fileChecksum = fs.getFileChecksum(pathString);
String md5Str = fileChecksum.toString();
String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
if (!currentMd5.equals(md5)) {
FSDataInputStream open = fs.open(pathString);
BufferedReader reader = new BufferedReader(new InputStreamReader(open));
String line = reader.readLine();
while (line != null) {
ctx.collect(line);
line = reader.readLine();
}
reader.close();
md5 = currentMd5;
}
Thread.sleep(interval);
}
}
@Override
public void cancel() {
isCancel = false;
}
@Override
public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
List<String> list = new ArrayList<>();
list.add(md5);
System.out.println("snapshotState");
return list;
}
@Override
public void restoreState(List<String> state) throws Exception {
md5 = state.get(0);
}
}
运行程序:
package cn.zhanghub.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class FileSourceOperatorStateListCheckpointed {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//保存EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//每次ck之间的间隔,不会重叠
checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
//每次ck的超时时间
checkpointConfig.setCheckpointTimeout(20000L);
//如果ck执行失败,程序是否停止
checkpointConfig.setFailOnCheckpointingErrors(true);
//job在执行CANCE的时候是否删除ck数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// //恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceOperatorStateListCheckpointedFunction());
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if(value.contains("中国")){
int a = 1/0;
}
return value;
}
}).print();
env.execute();
}
}
2. 使用 KeyedState ** **:
1.Keyed State之过期超时策略
- 由于Keyed State太多,所以flink提供了针对Keyed State TTL的设置
- 任何类型的keyed State都可以设置TTL。如果TTL已配置,且状态已过期,则将以最佳方式处理
- 所有State collection都支持条目级别的TTL,即list、map中的条目独立expire
- 用法:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
Refresh策略(默认是OnCreateAndWrite):设置如何更新keyedState的最后访问时间
- StateTtlConfig.UpdateType.Disabled - 禁用TTL,永不过期
- StateTtlConfig.UpdateType.OnCreateAndWrite - 每次写操作均更新State的最后访问时间(Create、Update)
- StateTtlConfig.UpdateType.OnReadAndWrite - 每次读写操作均更新State的最后访问时间
状态可见性(默认是NeverReturnExpired):设置是否返回过期的值(过期尚未清理,此时正好被访问)
- StateTtlConfig.StateVisibility.NeverReturnExpired - 永不返回过期状态
- StateTtlConfig.StateVisibility.ReturnExpiredlfNotCleanedUp - 可以返回过期但尚未清理的状态值
TTL time等级
- setTimeCharacteristic(TimeCharacteristic timeCharacteristic)
- 目前只支持ProcessingTime
2.Keyed State之过期状态清理
清理策略
默认:已经过期的数据被显示读取时才会清理(可能会导致状态越来越大,后续版本会改进)
FULL_STATE_SCAN_SNAPSHOT:在checkpoint时清理full snapshot中的expired state
- CleanupFullSnapshot()
- 不适用于在RocksDB state backend上的incremental checkpointing
3.Keyed State TTL的注意事项
- 启用TTL增加后端状态存储的消耗
- 原来没启用TTL,后来启用TTL做恢复会将导致兼容性失败和StatmigrationException(反之也一样)
- TTL配置不是检查或保存点的一部分
示例代码:
package cn.zhanghub.state;
import cn.zhanghub.source.FileCountryDictSourceFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class CountryCodeConnectKeyByKeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//保存EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//每次ck之间的间隔,不会重叠
checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
//每次ck的超时时间
checkpointConfig.setCheckpointTimeout(20000L);
//如果ck执行失败,程序是否停止
checkpointConfig.setFailOnCheckpointingErrors(true);
//job在执行CANCE的时候是否删除ck数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<String> countryDictSource = env.addSource(new FileCountryDictSourceFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
KeyedStream<Tuple2<String, String>, String> countryDictKeyBy = countryDictSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split("\t");
return Tuple2.of(split[0], split[1]);
}
}).keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
});
KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
@Override
public String getKey(HainiuKafkaRecord value) throws Exception {
return value.getRecord();
}
});
ConnectedStreams<Tuple2<String, String>, HainiuKafkaRecord> connect = countryDictKeyBy.connect(record);
SingleOutputStreamOperator<String> connectInput = connect.process(new KeyedCoProcessFunction<String, Tuple2<String, String>, HainiuKafkaRecord, String>() {
private MapState<String,String> map = null;
@Override
public void open(Configuration parameters) throws Exception {
//keyState的TTL策略
StateTtlConfig ttlConfig = StateTtlConfig
//keyState的超时时间为100秒
.newBuilder(Time.seconds(100))
//当创建和更新时,重新计时超时时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//失败时不返回keyState的值
//.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//失败时返回keyState的值
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
//ttl的时间处理等级目前只支持ProcessingTime
.setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
.build();
//从runtimeContext中获得ck时保存的状态
MapStateDescriptor<String,String> msd = new MapStateDescriptor<String, String>("map",String.class,String.class);
msd.enableTimeToLive(ttlConfig);
map = getRuntimeContext().getMapState(msd);
}
@Override
public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
map.put(ctx.getCurrentKey(), value.f1);
out.collect(value.toString());
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
for(Map.Entry<String,String> m:map.entries()){
System.out.println(m.getKey());
System.out.println(m.getValue());
}
if(value.getRecord().equals("CN")){
int a = 1/0;
}
String countryCode = ctx.getCurrentKey();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
});
connectInput.print();
env.execute();
}
}
3.**使用BroadcastState:**
之前的程序是使用Distribute(keyBy)的方式让数据进行shuffle完成数据的join的,那shuffle可能会带来数据倾斜的问题,那怎么能不shuffle完成数据的join呢?使用广播状态,相当于spark的广播变量的作用。
1).为特殊场景而生
特殊场景:来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并用于处理另一个流上的所有传入元素。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按规则进行计算。
典型应用:
- 常规事件流.connect(事件流)
- 常规配置流.connect(配置流)
2).Broadcast State使用套路(三步)
- 创建常规事件流DataStream / KeyedDataStream
- 创建BroadcastedStream:创建规则流 / 配置流(低吞吐)并广播
- 连接两个Stream,生成BroadcastConnectedStream并实现计算处理
- proccess(BroadcastProcessFunction and KeyedBroadcastProcessFunction)
3).BroadcastProcessFunction
public abstract class BroadcastProcessFunction<IN1,IN2,OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value,ReadOnlyContext ctx,Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value,Context ctx,Collector<OUT> out) throws Exception;
}
- processElement(…):负责处理非广播流中的传入元素,他可以使用与广播状态进行匹配
processBroadcastElement(…):负责处理广播流中的传入元素(例如规则),一般把广播流的元素添加到状态(MapState)里去备用,processElement处理业务数据时就可以使用(规则)
ReadOnlyContext和Context的不同
- ReadOnlyContext对Broadcast State有只读权限
- Context有读写权限
首先来个badCase
示例代码:
发射map类型的sourceFunction
package cn.zhanghub.source;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
public class FileCountryDictSourceMapFunction implements SourceFunction<Map<String,String>> {
private String md5 = null;
private Boolean isCancel = true;
private Integer interval = 10000;
@Override
public void run(SourceContext<Map<String,String>> ctx) throws Exception {
Path pathString = new Path("hdfs://ns1/user/qingniu/country_data");
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.get(hadoopConf);
while (isCancel) {
if(!fs.exists(pathString)){
Thread.sleep(interval);
continue;
}
FileChecksum fileChecksum = fs.getFileChecksum(pathString);
String md5Str = fileChecksum.toString();
String currentMd5 = md5Str.substring(md5Str.indexOf(":") + 1);
if (!currentMd5.equals(md5)) {
FSDataInputStream open = fs.open(pathString);
BufferedReader reader = new BufferedReader(new InputStreamReader(open));
String line = reader.readLine();
Map<String,String> map = new HashMap<>();
while (line != null) {
String[] split = line.split("\t");
map.put(split[0],split[1]);
line = reader.readLine();
}
ctx.collect(map);
reader.close();
md5 = currentMd5;
}
Thread.sleep(interval);
}
}
@Override
public void cancel() {
isCancel = false;
}
}
运行类:
package cn.zhanghub.operator;
import cn.zhanghub.source.FileCountryDictSourceMapFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CountryCodeConnectMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//必须设置不然匹配不上
env.setParallelism(1);
DataStreamSource<Map<String, String>> countryDictSource = env.addSource(new FileCountryDictSourceMapFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
ConnectedStreams<Map<String, String>, HainiuKafkaRecord> connect = countryDictSource.connect(kafkainput);
SingleOutputStreamOperator<String> connectInput = connect.process(new CoProcessFunction<Map<String, String>, HainiuKafkaRecord, String>() {
private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement1(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
for (Map.Entry<String, String> entry : value.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
out.collect(value.toString());
}
@Override
public void processElement2(HainiuKafkaRecord value, Context ctx, Collector<String> out) throws Exception {
String countryCode = value.getRecord();
String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
});
connectInput.print();
env.execute();
}
}
使用广播状态进行优化:
public abstract class BroadcastProcessFunction<IN1,IN2,OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value,ReadOnlyContext ctx,Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value,Context ctx,Collector<OUT> out) throws Exception;
}
processElement(…):负责处理非广播流中的传入元素,他可以使用与广播状态进行匹配
processBroadcastElement(…):负责处理广播流中的传入元素(例如规则),一般把广播流的元素添加到状态(MapState)里去备用,processElement处理业务数据时就可以使用(规则)
ReadOnlyContext和Context的不同
- ReadOnlyContext对Broadcast State有只读权限
- Context有读写权限
示例代码:
package cn.zhanghub.state;
import cn.zhanghub.source.FileCountryDictSourceMapFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class CountryCodeConnectMapBroadCast {
private static final MapStateDescriptor<String, String> msd = new MapStateDescriptor<>("countryCodeMap", String.class, String.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//保存EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//每次ck之间的间隔,不会重叠
checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
//每次ck的超时时间
checkpointConfig.setCheckpointTimeout(20000L);
//如果ck执行失败,程序是否停止
checkpointConfig.setFailOnCheckpointingErrors(true);
//job在执行CANCE的时候是否删除ck数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定保存ck的存储模式,这个是默认的
MemoryStateBackend stateBackend = new MemoryStateBackend(10 * 1024 * 1024, false);
//指定保存ck的存储模式
// FsStateBackend stateBackend = new FsStateBackend("file:///Users/leohe/Data/output/flink/checkpoints", true);
// RocksDBStateBackend stateBackend = new RocksDBStateBackend("file:///Users/leohe/Data/output/flink/checkpoints", true);
env.setStateBackend(stateBackend);
//恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<Map<String, String>> countryDictSource = env.addSource(new FileCountryDictSourceMapFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
BroadcastStream<Map<String, String>> broadcastinput = countryDictSource.broadcast(msd);
BroadcastConnectedStream<HainiuKafkaRecord, Map<String, String>> broadcastConnect = kafkainput.connect(broadcastinput);
SingleOutputStreamOperator<String> broadcastConnectInput = broadcastConnect.process(new BroadcastProcessFunction<HainiuKafkaRecord, Map<String, String>, String>() {
//private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement(HainiuKafkaRecord value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
String countryCode = value.getRecord();
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
String countryName = broadcastState.get(countryCode);
// String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
@Override
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
for (Map.Entry<String, String> entry : value.entrySet()) {
broadcastState.put(entry.getKey(), entry.getValue());
}
// for (Map.Entry<String, String> entry : value.entrySet()) {
// map.put(entry.getKey(), entry.getValue());
// }
out.collect(value.toString());
}
});
broadcastConnectInput.print();
env.execute();
}
}
广播状态的别一种使用方法,keyBy之后的:
4. KeyedBroadcastProcessFunction
processElement(…):负责处理非广播流中的传入元素
processBroadcastElement(…):负责处理广播流中的传入元素(例如规则),一般把广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用(规则)
ReadOnlyContext和Context的不同
- ReadOnlyContext对Broadcast State有只读权限
- Context有读写权限
示例代码:
package cn.zhanghub.state;
import cn.zhanghub.source.FileCountryDictSourceMapFunction;
import cn.zhanghub.source.HainiuKafkaRecord;
import cn.zhanghub.source.HainiuKafkaRecordSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class CountryCodeConnectMapKeyedBroadCast {
private static final MapStateDescriptor<String, String> msd = new MapStateDescriptor<>("countryCodeMap", String.class, String.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//保存EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//每次ck之间的间隔,不会重叠
checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
//每次ck的超时时间
checkpointConfig.setCheckpointTimeout(20000L);
//如果ck执行失败,程序是否停止
checkpointConfig.setFailOnCheckpointingErrors(true);
//job在执行CANCE的时候是否删除ck数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定保存ck的存储模式,这个是默认的
MemoryStateBackend stateBackend = new MemoryStateBackend(10 * 1024 * 1024, false);
//指定保存ck的存储模式
// FsStateBackend stateBackend = new FsStateBackend("file:///Users/leohe/Data/output/flink/checkpoints", true);
// RocksDBStateBackend stateBackend = new RocksDBStateBackend("file:///Users/leohe/Data/output/flink/checkpoints", true);
env.setStateBackend(stateBackend);
//恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<Map<String, String>> countryDictSource = env.addSource(new FileCountryDictSourceMapFunction());
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", "s1.hadoop:9092,s2.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
kafkaConsumerProps.setProperty("group.id", "qingniuflink");
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer010<HainiuKafkaRecord> kafkaSource = new FlinkKafkaConsumer010<>("flink_event", new HainiuKafkaRecordSchema(), kafkaConsumerProps);
// kafkaSource.setStartFromEarliest()
// kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest();
DataStreamSource<HainiuKafkaRecord> kafkainput = env.addSource(kafkaSource);
KeyedStream<HainiuKafkaRecord, String> record = kafkainput.keyBy(new KeySelector<HainiuKafkaRecord, String>() {
@Override
public String getKey(HainiuKafkaRecord value) throws Exception {
return value.getRecord();
}
});
BroadcastStream<Map<String, String>> broadcastinput = countryDictSource.broadcast(msd);
BroadcastConnectedStream<HainiuKafkaRecord, Map<String, String>> broadcastConnect = record.connect(broadcastinput);
SingleOutputStreamOperator<String> broadcastConnectInput = broadcastConnect.process(new KeyedBroadcastProcessFunction<String, HainiuKafkaRecord, Map<String, String>, String>(){
//private Map<String, String> map = new HashMap<String, String>();
@Override
public void processElement(HainiuKafkaRecord value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
String countryCode = ctx.getCurrentKey();
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
String countryName = broadcastState.get(countryCode);
// String countryName = map.get(countryCode);
String outStr = countryName == null ? "no match" : countryName;
out.collect(outStr);
}
@Override
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(msd);
for (Map.Entry<String, String> entry : value.entrySet()) {
broadcastState.put(entry.getKey(), entry.getValue());
}
// for (Map.Entry<String, String> entry : value.entrySet()) {
// map.put(entry.getKey(), entry.getValue());
// }
out.collect(value.toString());
}
});
broadcastConnectInput.print();
env.execute();
}
}
public class MyReduceFunctionWithKeyedState extends RichReduceFunction<Tuple2<String, Integer>> {
private ValueState<Integer> count;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
count.update(t1.f1 + t2.f1);
return Tuple2.of(t1.f0, count.value());
}
@Override
public void open(Configuration parameters) throws Exception {
final ValueStateDescriptor<Integer> cs = new ValueStateDescriptor<>("cs", Integer.class);
/** 超时时间设置*/
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
cs.enableTimeToLive(ttlConfig);
count = getRuntimeContext().getState(cs);
}
}
注意事项
- 每个任务的广播状态的元素顺序有可能不一样
- Broadcast State 保存在内存中(并不在 RocksDB )
5.CheckPoint 原理(面试经常问)
- 通过往source 注入barrier
- barrier作为checkpoint的标志
1.Barrier
- 全局异步化是snapshot的核心机制
- Flink分布式快照的核心概念之一就是数据栅栏(barrier)。这些barrier被插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier不会干扰正常数据,数据严格有序。一个barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入下一个快照。每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。多个不同快照的多个barrier会在流中同时出现,即多个快照可能同时创建。
- Barrier在数据源端插入,当snapshot n的barrier插入后,系统会记录当前snapshot位置值 n(用Sn表示)。
- 例如,在Apache Kafka中,这个变量表示某个分区中最后一条数据的偏移量。这个位置值 Sn 会被发送到一个称为checkpoint cordinator的模块。(即Flink 的 JobManager)
2. 分布式环境下的 ck 原理:
在分布式情况下:
- operator在收到所有输入数据流中的barrier之后,在发射barrier到其输出流之前对其状态进行快照。此时,在barrier之前的数据对状态的更新已经完成,不会再依赖barrier之前数据。
- 然后它会向其所有输出流插入一个标识 snapshot n 的barrier。当sink operator (DAG流的终点)从其输入流接收到所有barrier n时,它向checkpoint coordinator 确认 snapshot n 已完成当,所有sink 都确认了这个快照,代表快照在分布式的情况被标识为整体完成。
由于快照可能非常大,所以后端存储系统可配置。默认是存储到JobManager的内存中,但是对于生产系统,需要配置成一个可靠的分布式存储系统(例如HDFS)。状态存储完成后,operator会确认其checkpoint完成,发射出barrier到后续输出流。
快照现在包含了:
- 对于并行输入数据源:快照创建时数据流中的位置偏移
- 对于operator:存储在快照中的状态指针
3.Barrier 多并行度(对齐), flink 怎么保证 Exactly Once
接收超过一个输入流的operator需要基于barrier对齐(align)输入。参见上图:
- operator 只要一接收到某个输入流的barrier n,它就不能继续处理此数据流后续的数据,直到operator接收到其余流的barrier n。否则会将属于snapshot n 的数据和snapshot n+1的搞混
- barrier n 所属的数据流先不处理,从这些数据流中接收到数据被放入接收缓存里(input buffer)
- 当从最后一个流中提取到barrier n 时,operator 会发射出所有等待向后发送的数据,然后发射snapshot n 所属的barrier
- 经过以上步骤,operator 恢复所有输入流数据的处理,优先处理输入缓存中的数据
4.Exactly Once vs. At Least Once
- 对齐就Exactly Once(两个Barrier之间的数据就像在一个事务里一样,sink收到所有barrier n 时提交任务),不对齐就At Least Once
- Flink提供了在 checkpoint 时关闭对齐的方法,当 operator 接收到一个 barrier 时,就会打一个快照,而不会等待其他 barrier。
- 跳过对齐操作使得即使在 barrier 到达时,Operator 依然继续处理输入。这就是说:operator 在 checkpoint n 创建之前,继续处理属于 checkpoint n+1 的数据。所以当异常恢复时,这部分数据就会重复,因为它们被包含在了 checkpoint n 中,同时也会在之后再次被处理。
- 注意:对齐操作只会发生在拥有多输入运算(join)或者多个输出的 operator(重分区、分流)的场景下。所以,对于 map(), flatmap(), fliter() 等的并行操作即使在至少一次的模式中仍然会保证严格一次。
5.使用Checkpointing的前提条件
在一定时间内可回溯的datasource(故障时可以回溯数据),常见的:
- 一般是可持久化的消息队列:例如Kafka、RabbitMQ、Amazon Kinesis、Google PubSub
- 也可以是文件系统:HDFS、S3、GFS、NFS、Ceph
可持久化存储State的存储系统,通常使用分布式文件系统(Checkpointing就是把job的所有状态都周期性持久化到存储里)
- 一般是HDFS、S3、GFS、NFS、Ceph
注意:如果想保存checkpointing的时候是exactly-once的,那也需要你的存储端支持幂特性/事务
- 一般是hbase的rowkey,redies的key或者mysql的事务
帮我把档存到那里?
6.State Backend
选择合适的State Backend
1.什么是State Backend
State Backend就是用来保存快照的地方
用来在Checkpointing机制中持久化所有状态的一致性快照,这些状态包括:
- 非用户定义的状态:例如,timers、非用户自定义的stateful operators(connectors,windows)
- 用户定义的状态:就是前面讲的用户自定义的stateful operato所使用的Keyed State and Operator State
2.目前Flink自带三个开箱即用State Backend:
1).MemoryStateBackend(默认)
- MemoryStateBackend在Java堆上维护状态。Key/value状态和窗口运算符使用哈希表存储值和计时器等
- Checkpoint时,MemoryStateBackend对State做一次快照,并在向JobManager发送Checkpoint确认完成的消息中带上此快照数据,然后快照就会存储在JobManager的堆内存中
- MemoryStateBackend可以使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。如果不希望异步,可以在构造的时候传入false(也可以通过全局配置文件指定),如下
StateBackend backend = new MemoryStateBackend(1010241024,**false**); env.setStateBackend(backend);
限制
- 单个State的大小默认限制为5MB,可以在MemoryStateBackend的构造函数中增加
- 不论如何配置,State大小都无法大于akka.framesize(JobManager和TaskManager之间发送的最大消息的大小默认是10MB)
- JobManager必须有足够的内存大小
适用场景
- 本地开发和调试
- 小状态job,如只使用Map、FlatMap、Filter…或Kafka Consumer
2).FsStateBackend
- FsStateBackend需要配置一个文件系统的URL,如”hdfs://namenode:40010/flink/checkpoint”或”file:///data/flink/checkpoints”。
- FsStateBackend在TaskManager的内存中持有正在处理的数据。Checkpoint时将state snapshot 写入文件系统目录下的文件中。文件的路径等元数据会传递给JobManager,存在其内存中 (或者在HA模式下,存储在元数据checkpoint中)。
- FsStateBackend可以使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。如果不希望异步可以在构造的时候传入false(也可以通过全局配置文件指定),如下:
StateBackend backend = new FsStateBackend(“hdfs://namenode:40010/flink/checkpoints”,**false**); env.setStateBackend(backend);
适用场景
- 大状态、长窗口、大键/值状态的job
- 所有高可用性的情况
3).RocksDBStateBackend
- RocksDBStateBackend需要配置一个文件系统的URL来,如”hdfs://namenode:40010/flink/checkpoint”或”file:///data/flink/checkpoints”
- RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,(默认情况下)存储在TaskManager数据目录中,在Checkpoint时,整个RocksDB数据库将被Checkpointed到配置的文件系统和目录中。文件的路径等元数据会传递给JobManager,存在其内存中(或者在HA模式下,存储在元数据checkpoint中)。
- RocksDBStateBackend总是执行异步快照
StateBackend backend = new RocksDBStateBackend(“hdfs://namenode:40010/flink/checkpoints”); env.setStateBackend(backend);
限制
- RocksDB JNI API是基于byte[],因此key和value最大支持大小为2^31个字节(2GB)。RocksDB自身在支持较大value时候有问题(merge operations in RocksDB(e.g.ListState))
适用场景
- 超大状态,超长窗口、大键/值状态的job
- 所有高可用性的情况
与前两种状态后端对比:
- 目前只有RocksDBStateBackend支持增量checkpoint(默认全量)
- 状态保存在数据库中,即使用RockDB可以保存的状态量仅受可用磁盘空间量的限制,相比其他的状态后端可保存更大的状态,但开销更大(读/写需要反序列化/序列化去检索/存储状态),吞吐受到限制
· 使用RocksDBStateBackend特有配置
配置项 | 默认值 | 说明 |
---|---|---|
state.backend.rocksdb.localdir | (none) | |
state.backend.rocksdb.timer-service.factory | “HEAP” | 指定timer service状态存储在哪里, HEAP/ROCKSDB |
· 代码中配置RocksDBStateBackend(可覆盖全局配置)
StateBackend backend = new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints",true);
env.setStateBackend(backend);
· 需要单独引入POM依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
- 默认情况下(MemoryStateBackend):State保存在taskmanager的内存中,checkpoint存储在JobManager的内存中
4).StateBackend总结
1).配置StateBackend
- 全局配置(配置文件conf/flink-conf.yaml)
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
#Directory.for storing checkpoints
state.checkpoints.dir: hdfs:namenode:40010/flink/checkpoints
- 每个job单独配置State Backend(可覆盖全局配置)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
示例代码:
package cn.zhanghub.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class FileSourceOperatorStateListCheckpointedStateBackend {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//保存EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//每次ck之间的间隔,不会重叠
checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
//每次ck的超时时间
checkpointConfig.setCheckpointTimeout(20000L);
//如果ck执行失败,程序是否停止
checkpointConfig.setFailOnCheckpointingErrors(true);
//job在执行CANCE的时候是否删除ck数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定保存ck的存储模式,这个是默认的
MemoryStateBackend stateBackend = new MemoryStateBackend(10 * 1024 * 1024, false);
//指定保存ck的存储模式
// FsStateBackend stateBackend = new FsStateBackend("file:///Users/leohe/Data/output/flink/checkpoints", true);
// RocksDBStateBackend stateBackend = new RocksDBStateBackend("file:///Users/leohe/Data/output/flink/checkpoints", true);
env.setStateBackend(stateBackend);
//恢复策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
)
);
DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceOperatorStateListCheckpointedFunction());
stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value.contains("中国")) {
int a = 1 / 0;
}
return value;
}
}).print();
env.execute();
}
}
Checkpointing的默认全局配置(conf/flink-conf.yaml)
7.Savepoint
配置项 | 默认值 | 说明 |
---|---|---|
state.backend | (none) | • 用于指定checkpoint state存储的backend, 默认为none; • 目前支持的backends是’jobmanager’,’filesystem’,’rocksdb’ • 也可以使用它们的工厂类的全限定名: 例如org.apache.flink.runtime.state.filesystem. FsStateBackendFactory • 如果没指定, 默认使用jobmanager, 即MemoryStateBackend |
state.backend.async | true | 用于指定backend是否使用异步, 有些不支持async或者只支持async的state backend可能会忽略这个参数 |
state.backend.fs.memory-threshold | 1024 | 用于指定存储state的files大小阈值, 如果小于该值则会存储在root checkpoint metadata file |
state.backend.incremental | false | 用于指定是否采用增量checkpoint, 有些不支持增量checkpoint的backend会忽略该配置; 目前只有rocksdb支持 |
state.backend.local-recovery | false | |
state.checkpoints.dir | (none) | • 用于指定checkpoint的data files和meta data存储的目录, 该目录必须对所有参与的TaskManagers及JobManagers可见(有读写权限) • 例如: hdfs://namenode-host:port/flink-checkpoints |
state.checkpoints.num-retained | 1 | 用于指定保留的已完成的checkpoints最大个数 |
state.savepoints.dir | (none) | • 用于指定savepoints的默认目录 • 例如: hdfs://namenode-host:port/flink-checkpoints |
taskmanager.state.local.root-dirs | (none) |
1).Savepoint概念
概念
- savepoint可以理解为是一种特殊的checkpoint,savepoint就是指向checkpoint的一个指针,实际上也是使用通过checkpointing机制创建的streaming job的一致性快照,可以保存数据源的offset、并行操作状态也就是流处理过程中的状态历史版本。需要手动触发,而且不会过期,不会被覆盖,除非手动删除。正常情况下的线上环境是不需要设置savepoint的。除非对job或集群做出重大改动的时候, 需要进行测试运行。
- 可以从应用在过去的任意做了savepoint的时刻开始继续消费,具有可以replay的功能
Savepoint由两部分组成:
- 数据目录:稳定存储上的目录,里面的二进制文件是streaming job状态的快照
- 元数据文件:指向数据目录中属于当前Savepoint的数据文件的指针(绝对路径)
与Checkpoint的区别:Savepoint相当于备份(类比数据库备份)、Checkpoint相当于recovery log
- Checkpoint是Flink自动创建的”recovery log”用于故障自动恢复,由Flink创建,不需要用户交互。用户cancel作业时就删除,除非启动了保留机制(External Checkpoint)
- Savepoint由用户创建,拥有和删除,保存点在作业终止后仍然存在。
作用
- job开发新版本(更改job graph、更改并行度等等),应用重新发布
- Flink版本的更新
- 业务迁移,集群需要迁移,不容许数据丢失
2).区分Checkpoint、External Checkpoint、Savepoint
概念 | 描述 | 使用场景 |
---|---|---|
Checkpoint | 定期、自动的对job下的所有状态多快照并存储,会过期,仅用于从故障中恢复(重启策略)。当job cancel之后会被删除。 | 应用内部restarting时使用 |
External Checkpoint | 一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理;属于checkpoint的范畴 | |
Savepoint | 用于创建的job state的“备份”,是一种特殊的checkpoint,只不过不像checkpoint定期的从系统中去触发的,它是用户通过命令触发,存储格式和checkpoint一样(代码都一样); 注意:当Checkpoint使用RocksDBStateBackend并使用增量Checkpoint时会使用RocksDB内部格式,就跟Savepoint格式不一样了 | job开发新版本(更改job graph、更改并行度等等),应用重新发布 Flink版本的更新 业务迁移,集群需要迁移,不容许数据丢失 |
3).assigning Operator ID
为了便于未来升级job程序,建议为operator分配ID,例如:
DataStream<String> stream = env. //Stateful source(e.g. Kafka) with ID .addSource(new StatefulSource()) .uid("source-id") //ID for the source operator .shuffle() // Stateful mapper with ID .map(new StatefulMapper()) .name("mapper-id") //ID for the mapper //Stateless printing sink .print(); // Auto-generated ID
如果不指定ID会自动生成,只要ID不变就能从指定的Savepoint恢复。自动生成ID依赖于结构,代码会变更会导致ID改变,所以手工分配ID是推荐的做法
设置ID之后,Savepoint可以想象为一个map映射(Operator ID -> State)
4).触发Savepoint
- 直接触发Savepoint(想象你要为数据库做个备份)
$ bin/flink savepoint :jobId [:targetDirectory]
· 直接触发Savepoint(Flink on yarn):
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
· Cancel Job with Savepoint:
$ bin/flink cancel -s [:targetDirectory] :jobId
5).从Savepoint恢复job
- 从指定Savepoint恢复job
$ bin/flink run -s :savepointPath [:runArgs]
· 从指定Savepoint恢复job(允许跳过不能映射的状态,例如删除了一个operator)
$ bin/flink run -s :savepointPath -n [:runArgs]
6).删除Savepoint
· 删除Savepoint
$ bin/flink savepoint -d :savepointPath
· 注意:还可以通过常规的文件系统操作手动删除Savepoint(不影响其他Savepoint或Checkpoint)
8.状态的重新分配
Operator State 与Keyed State 的Redistribute (重新分配)
1).Operator State Redistribute
· Redistribute:当Operator改变并发度的时候(Rescale),会触发状态的Redistribute,即Operator State里的数据会重新分配到Operator的Task实例
· 举例:某Operator的并行度由3改为2
不同数据结构的动态扩展方式不一样:
- ListState:并发度在改变的时候,会将并发上的每个List都取出,然后把这些List合并到一个新的List,然后根据元素的个数在均匀分配给新的Task
- UnionListState:相比于ListState更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的List拼接起来。然后不做划分,直接交给用户(每个Task给全量的状态,用户自己划分)
- BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据COPY到新的Task即可,以上是Flink Operator States提供的3种扩展方式,用户可以根据自己的需求做选择。
2).Keyed State的Redistribute(重新分配)
Keyed State Redistribute
Key被Redistribute哪个task,他对应的Keyed State就被Redistribute到哪个Task
Keyed State Redistribute是基于Key Group来做分配的:
- 将key分为group
- 每个key分配到唯一的group
- 将group分配给task实例
- Keyed State最终分配到哪个Task:group ID和taskID是从0开始算的
- hash=hash(key)
- KG=hash % numOfKeyGroups
- Subtask=KG* taskNum / numOfKeyGroups
numOfKeyGroups是有多少个组,taskNum有多少个任务,KG是组ID从0开始算,Subtask是任务ID从0开始算
CheckpointedFunction如何选择重分配策略
CheckpointedFunction接口通过不同的Redistribute方案提供对Operator State的访问
获取状态时指定:getListState/getUnionListState(注意:ListState的不同分配策略,自己要根据不同的分配策略写对应拿取数据的逻辑)
9.kafka的source与sink的容错
1.FlinkKafkaConsumer容错
1).理解FlinkKafkaSource的容错性 (影响出错时数据消费的位置)
- 如果Flink启用了检查点,Flink Kafka Consumer将会周期性的checkpoint其Kafka偏移量到快照。
- 通过实现CheckpointedFunction
- ListState<Tuple2<KafkaTopicPartition, Long>>
- 保证仅一次消费
- 如果作业失败,Flink将流程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新消费Kafka中的记录。(此时前面所讲的消费策略就不能决定消费起始位置了,因为出故障了)
2).Flink Kafka Consumer Offset提交行为
情形 | 谁决定消费起始位置 |
---|---|
禁用Checkpoint | Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。因此,要禁用或启用偏移量提交,只需将enable.auto.commit(或auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms设置设置到Kafka客户端的Properties |
启用Checkpoint | Checkpoint时会保存Offset到snapshot 当一次Checkpoint完成时,Flink Kafka Consumer将snapshot中的偏移量提交给 kafka/zookeeper。这确保Kafka Consumer中提交的偏移量与检查点状态中的偏移量一致。 用户可以通过调用Flink Kafka Consumer的setCommitOffsetsOnCheckpoints(boolean) ,方法来禁用或启用偏移提交到kafka/zookeeper (默认情况下,行为为true)。 在此场景中,Properties中的自动定期偏移量提交设置将被完全忽略。 |
3).不同情况下消费起始位置的分析
情形 | 谁决定消费起始位置 |
---|---|
第一次启动, 无savepoint(常规情况) | 由消费模式决定 |
通过savepoint启动(应用升级,比如加 大并行度) | 由savepoint记录的offset决定 |
有checkpoint,失败后,job恢复的情况 | 由checkpoint的snapshoot中记录的offset决定 |
无checkpoint,失败后,job恢复的情况 | 由消费模式决定 |
2.FlinkKafkaProducer容错
版本 | 容错性保证 |
---|---|
Kafka 0.8 | at most once(有可能丢数据) |
Kafka 0.9/0.10 | 启动checkpoint时默认保证at-least-once(有可能重复) setLogFailuresOnly(boolean) 默认是false(false保证at-least-once)往kafka发送数据失败了是否打日志: False:不打日志,直接抛异常,导致应用重启(at-least-once) True:打日志(丢数据) setFlushOnCheckpoint(boolean) 默认是true(true保证at_least_once)Flink checkpoint时是否等待正在写往kafka的数据返回ack |
Kafka 0.11 | 必须启动checkpoint 可以通过构造参数选择容错性语意: Semantic.NONE:可能丢失也可能重复 Semantic.AT_LEAST_ONCE:不会丢失,但可能重复(默认) Semantic.EXACTLY_ONCE:使用Kafka事务提供exactly-once语义 |
Properties producerPropsSns = new Properties();
producerPropsSns.setProperty("bootstrap.servers", "s1.hadoop:9092,s3.hadoop:9092,s4.hadoop:9092,s5.hadoop:9092,s6.hadoop:9092,s7.hadoop:9092,s8.hadoop:9092");
producerPropsSns.setProperty("retries", "3");
//FlinkKafkaProducer010类的构造函数支持自定义kafka的partitioner,
FlinkKafkaProducer010 kafkaOut = new FlinkKafkaProducer010<String>("flink_event_result",new SimpleStringSchema(),producerPropsSns,new HainiuFlinkPartitioner());
kafkaOut.setLogFailuresOnly(false);
kafkaOut.setFlushOnCheckpoint(true);