600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Flink开发项目中遇到的一些问题总汇(持续更新)

Flink开发项目中遇到的一些问题总汇(持续更新)

时间:2020-09-24 23:38:31

相关推荐

Flink开发项目中遇到的一些问题总汇(持续更新)

问题 解决:

1、(Constants.A, Constants.B) 是定位到具体配置

2、设置flink全局变量

env.getConfig.setGlobalJobParameters(C)

但是某些算子可能用到非全局变量中的一些配置例如:map、process中open 函数中的配置需要局部配置变量。

3、map和process的联系:

map方法不允许缺少数据,也就是原来多少条数据,处理后依然是多少条数据,只是用来做转换。

本次开发map函数用来将source流中的数据转换成json model 对象

.map(JsonUtil.fromJson[LogModel] _).setParallelism(dealParallelism)

为什么不在map中写逻辑规则,是因为map方法不允许缺少数据,数据不知道什么时候来数据调用map需要吃资源,采用process,可以懒加载???

process中:

events (流元素)

state (容错,一致性,只在Keyed Stream)

timers (事件时间和处理时间, 只在keyed stream)

eg:

.process(new FProcess)class FProcess extends ProcessFunction[LogModel, (Object, Boolean)] {//mutable.HashSet可变的hashsetval cmdSet: mutable.HashSet[String] = new mutable.HashSet[String]()override def open(parameters: Configuration): Unit = {//配置局部变量val globConf = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration]val cmdStr = globConf.getString(Constants.D, "")val splits = cmdStr.split("\\|")for (cmd <- splits) {cmdSet.add(cmd)}}override def processElement(model: LogModel, ctx: ProcessFunction[LogModel, (Object, Boolean)]#Context,out: Collector[(Object, Boolean)]): Unit = {val action = model.actionif (action.length > 0) {breakable {for (cmd <- cmdSet) {if (action.contains(cmd)) {val entity = new Eentity.setCmd()entity.setSourceIp(model.A)entity.setUserName(model.B)entity.setDestinationIp(model.C)entity.setWarnLevel(4)out.collect((entity, true))break()}}}}

4、

E:RabbitMQ 中该模块的queue,类似kafka中的topic

val stream4A = env.addSource(new RMQSource(connectionConfig, E, false, new SimpleStringSchema()))

5、个人理解:因为某些业务有时间限制,而数据源本身会因为网略等因素出现延迟等现象,采用时间戳得方式判断不够严谨,所以采用封装event采用判断事件发生的时间更加合理。

AssignerWithPeriodicWatermarks 周期性的生成watermark,生成间隔可配置,根据数据的eventTime来更新watermark时间

AssignerWithPunctuatedWatermarks 不会周期性生成watermark,只根据元素的eventTime来更新watermark。

当用EventTime和ProcessTime来计算时,元素本身都是不带时间戳的,只有以IngestionTime计算时才会打上进入系统的时间戳。

以下为AssignerWithPunctuatedWatermarks需要重写得两个方法

override def checkAndGetNextWatermark(lastElement: LogModel, extractedTimestamp: Long): Watermark = {//设定一个10s的缓冲区new Watermark(extractedTimestamp - 10000)}override def extractTimestamp(element: LogModel, previousElementTimestamp: Long): Long = {//事件时间element.logTime}

无论何时一个特定的事件表明一个新的watermark可能需要被创建,都使用AssignerWithPunctuatedWatermarks来生成。

在这个类中Flink首先调用extractTimestamp(…)来为元素分配一个timestamp,然后立即调用该元素上的checkAndGetNextWatermark(…)方法。

checkAndGetNextWatermark(…)方法传入在extractTimestamp(…)方法中分配的timestamp,并决定是否需要生产watermark。一旦checkAndGetNextWatermark(…)返回一个非空的watermark并且watermark比前一个watermark大的话,这个新的watermark将会被发送。

6、Timestamp和Watermark的关系和区别:

指定时间戳(Assigning Timestamps)

为了使用eventtime,Flink需要知道事件的时间戳,也就是说数据流中的元素需要分配一个事件时间戳。

这个通常是通过抽取或者访问事件中某些字段的时间戳来获取的。

时间戳的分配伴随着水印的生成,告诉系统事件时间中的进度。

这里有两种方式来分配时间戳和生成水印:

1、直接在数据流源中进行

2、通过timestamp assigner和watermark generator生成:在Flink中,timestamp 分配器也定义了用来发射的水印。

注意:timestamp和watermark都是通过从1970年1月1日0时0分0秒到现在的毫秒数来指定的。

有Timestamp和Watermark的源函数(Source Function with Timestamps And Watermarks)

数据流源可以直接为它们产生的数据元素分配timestamp,并且他们也能发送水印。

这样做的话,就没必要再去定义timestamp分配器了,需要注意的是:如果一个timestamp分配器被使用的话,由源提供的任何timestamp和watermark都会被重写。

为了通过源直接为一个元素分配一个timestamp,源需要调用SourceContext中的collectWithTimestamp(…)方法。为了生成watermark,源需要调用emitWatermark(Watermark)方法。

下面是一个简单的(无checkpoint)由源分配timestamp和产生watermark的例子:

override def run(ctx: SourceContext[MyType]): Unit = {while (/* condition */) {val next: MyType = getNext()ctx.collectWithTimestamp(next, next.eventTimestamp)if (next.hasWatermarkTime) {ctx.emitWatermark(new Watermark(next.getWatermarkTime))}}}

7、TimeStamp分配器和Watermark生成器(Timestamp Assigners / Watermark Generators)

Timestamp分配器获取一个流并生成一个新的带有时间戳元素和水印的流。

如果原来的流中已经有了timestamp和/或水印的话,这个timestamp分配器会覆盖掉。

Timestamp分配器常常在数据源之后就立即指定了,但是并不是要严格这么做,一个常用的模式是先解析(MapFunction)和过滤(FilterFunction)后再指定timestamp 分配器。在任何情况下,时间戳分配器都必须在第一个在事件时间上运行的操作(如:第一个时间窗口操作)之前指定。有一个特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在源内部指定timestamp分配器和watermark生成器。更多关于如何进行的信息请参考Kafka Connector的文档。

接下来的部分展示了要创建自己的timestamp 抽取器和watermark发射器,程序员需要实现的主要接口。想要查看Flink预定义的抽取器,请前往预定于Timestamp Extractors/Watermark Emitter页面。

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter());val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())withTimestampsAndWatermarks.keyBy( _.getGroup ).timeWindow(Time.seconds(10)).reduce( (a, b) => a.add(b) ).addSink(...)

8、周期性水印(With Periodic Watermarks):

AssignerWithPeriodicWatermarks周期性地分配timestamp和生成watermark(可能依赖于元素或者纯粹基于处理时间)。

watermark产生的事件间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(…)来定义的,每当分配器的getCurrentWatermark()方法被调用时,如果返回的watermark是非空并且大于上一个watermark的话,一个新的watermark将会被发射。

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {val maxOutOfOrderness = 3500L; // 3.5 secondsvar currentMaxTimestamp: Long;override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {val timestamp = element.getCreationTime()currentMaxTimestamp = max(timestamp, currentMaxTimestamp)timestamp;}override def getCurrentWatermark(): Watermark = {// return the watermark as current highest timestamp minus the out-of-orderness boundnew Watermark(currentMaxTimestamp - maxOutOfOrderness);}}class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {val maxTimeLag = 5000L; // 5 secondsoverride def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {element.getCreationTime}override def getCurrentWatermark(): Watermark = {// return the watermark as current time minus the maximum time lagnew Watermark(System.currentTimeMillis() - maxTimeLag)}}

9、带断点的水印(With Punctuated Watermarks)

无论何时一个特定的事件表明一个新的watermark可能需要被创建,都使用AssignerWithPunctuatedWatermarks来生成。在这个类中Flink首先调用extractTimestamp(…)来为元素分配一个timestamp,然后立即调用该元素上的checkAndGetNextWatermark(…)方法。

checkAndGetNextWatermark(…)方法传入在extractTimestamp(…)方法中分配的timestamp,并决定是否需要生产watermark。一旦checkAndGetNextWatermark(…)返回一个非空的watermark并且watermark比前一个watermark大的话,这个新的watermark将会被发送。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {element.getCreationTime}override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null}}

#------------------------(6月6日 11:37:36)-------------------------------

最近太忙了一直没有更新

10、本次项目开发数据流向

多Sink 写出 Flink有很好的支持,不要像我傻兮兮的去找,自己实际动手测试一下。

数据源:RMQ (读取数据源没什么大问题,读嘛,能有什么问题呢)

数据处理:根据业务处理做不同处理。

数据输出:这个是真的坑,还花我钱买专栏,Flink 支持多路输出直接将处理好的数据直接addsink 就好,没必要想太多,当时太菜了,以为中间可能会有什么坑,也懒得试就直接买了某个专栏 。啥也不是。

本次项目数据输出有点多:

1、输出到Mysql的告警表,做前端展示。2、将产生告警的数据的原始数据写入ES,做数据追溯3、将告警数据写入RMQ1,做告警得归并,入另一个系统。项目迭代:新写入一个RMQ2 ,做复杂事件处理。

写入Mysql,调用写好得SqlHelper ,这个我会在后期项目结束,将完整的SqlHelper 发出来

//写入mysqlval alartId: Long = sqlHelper.insertGeneratedKey(entity)//写入mysql的Help方法public long insertGeneratedKey(Object object) {if (object == null) {return -1L;}//获取类型类Class clazz = object.getClass();//获取连接conn = C3P0Util.getConnection();//获取该类的所有字段Field[] fields = clazz.getDeclaredFields();StringBuilder sql = new StringBuilder();StringBuilder sqlValue = new StringBuilder();//获取该类的数据库名和表名Table table = (Table) clazz.getAnnotation(Table.class); //得到类的注解String tableName = table.name();//编辑插入sqlsql.append("INSERT INTO ").append(getDatabaseTableName(database,tableName)).append(" (");for (int i = 0; i < fields.length; i++) {//使其可以访问私有字段fields[i].setAccessible(true);//根据字段获取起get方法PropertyDescriptor propertyDescriptor = null;try {propertyDescriptor = new PropertyDescriptor(fields[i].getName(),clazz);} catch (IntrospectionException e) {e.printStackTrace();}if (propertyDescriptor != null) {Method method = propertyDescriptor.getReadMethod();//获取其对应数据库的字段名Column column = method.getAnnotation(Column.class);sql.append(column.name());sqlValue.append("?");if (i != fields.length - 1) {sql.append(",");sqlValue.append(",");}}}sql.append(") ").append("VALUES (").append(sqlValue).append(")");try {ps = conn.prepareStatement(sql.toString(), Statement.RETURN_GENERATED_KEYS);logger.info(sql.toString());Field[] fieldsTemp = clazz.getDeclaredFields();for (int i = 0; i < fieldsTemp.length; i++) {fieldsTemp[i].setAccessible(true);ps.setObject(i + 1, fieldsTemp[i].get(object));}ps.execute();rs = ps.getGeneratedKeys();if (rs.next()) {return rs.getLong(1);}} catch (Exception e) {e.printStackTrace();} finally {C3P0Util.close(conn, ps, rs);}return -1L;}

//写入esstream.flatMap(new FromModelToMapFunction).addSink(new ElasticsearchSink[java.util.Map[String, Object]](ESConfig, transportAddresses,new WarnLogESSink(esIndex, esType))).setParallelism(sinkParallelism)//FromModelToMapFunctionclass FromModelToMapFunction extends RichFlatMapFunction[(Object, String, Int, Long, ArrayBuffer[LogModel]),java.util.Map[String, Object]] {override def flatMap(value: (Object, String, Int, Long, ArrayBuffer[LogModel]),out: Collector[java.util.Map[String, Object]]): Unit = {val ab = value._5for (model <- ab) {val option = LogModel.fromModelToMap(value._2, value._3, value._4, model)val map: util.Map[String, Object] = option.get// if (!map.isEmpty) {// }out.collect(map)}}}//WarnLogESSinkclass WarnLogESSink(index: String, `type`: String)//class AlarmLogESSink(index: String, `type`: String)extends ElasticsearchSinkFunction[java.util.Map[String, Object]] {def createIndexRequest(element: java.util.Map[String, Object]): IndexRequest = {return Requests.indexRequest().index(index)//.index(index).`type`(`type`).source(element)}override def process(t: util.Map[String, Object], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {requestIndexer.add(createIndexRequest(t))}}//

//写入rmqstream.map(t => JsonUtil.toJson(t._1)).addSink(new RichRMQSink(connectionConfig, rmqQueueName,new SimpleStringSchema())).setParallelism(sinkParallelism)

是因为Flink 源码中的问题,这里我粘出来。

//这是RmqSink中setupQueue方法/*** Sets up the queue. The default implementation just declares the queue. The user may override* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or* defining custom queue parameters)*/protected void setupQueue() throws IOException {if (queueName != null) {channel.queueDeclare(queueName, false, false, false, null);}}//这是RmqSource 中的setupQueue 方法/*** Sets up the queue. The default implementation just declares the queue. The user may override* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or* defining custom queue parameters)*/protected void setupQueue() throws IOException {channel.queueDeclare(queueName, true, false, false, null);}

参数解释:

第二个参数是durable。队列是否持久化。false为不支持。一般在我们写入RabbitMQ之前,RabbitMQ的对列已经创建好,有些会指定durable为true。这个时候就有问题了,因为RabbitMQ队列durable属性不同,就会一直去连接,直到RabbitMQ挂掉或程序关掉。

发现问题了么????印象中的问题是:RabbitMQ Web UI,会发现自己创建RabbitMQ和系统创建RabbitMQ的队列属性不一致 ,就是序列化的问题

channel.queueDeclare(queueName, false, false, false, null);channel.queueDeclare(queueName, true, false, false, null);

//重写setupQueue方法public class RichRMQSink extends RMQSink<String> {public RichRMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<String> schema) {super(rmqConnectionConfig, queueName, schema);}/*** 此方法必须重写,如果RabbitMQ的queue的durable属性设置为true,则会导致RabbitMQ会一直connection,导致RabbitMQ耗尽资源挂掉*/@Overrideprotected void setupQueue() {try{if (queueName != null) {channel.queueDeclare(queueName, true, false, false, null);}}catch (IOException e){System.out.println("io exception");}}}

11、并行度+定时器+触发口诀:单流取最大,多流取最小。

12、

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。