Flink:-未返回JobSubmissionResult,请确保您调用了ExecutionEnvironment.execute()

上周,我正在一个基于Apache Flink的项目中工作,在该项目中,我向流中添加了更多子任务。 当通过flink CLI提交作业时,我没有No JobSubmissionResult返回错误。

我们知道,当我们忘记调用ExecutionEnvironment.execute()时,也不会发生JobSubmissionResult。 但是以我为例,当我尝试使用命令使用Flink CLI进行部署时,我仍然调用env.execute(“ Stream Job”)仍然失败,尽管当我尝试将任务jar部署到Web界面时,任务已成功部署。

  ./flink运行/opt/flink-jobs/stock-ohlc-job.jar 

我在下面的堆栈跟踪中注意到,这使作业未找到异常。

 错误akka.remote.EndpointWriter-暂时关联错误(关联保持活动状态) 
akka.remote.OversizedPayloadException:丢弃发送给Actor的过大有效载荷[akka.tcp://flink@172.XX.XX.XX:6123 / user / jobmanager#467971694]:最大允许大小为10485760字节,编码类org的实际大小.apache.flink.runtime.messages.JobManagerMessages $ LeaderSessionMessage是13846328字节。
 引起原因:org.apache.flink.runtime.client.JobExecutionException:无法从JobManager检索JobExecutionResult。 
在org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
在org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
在org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
...还有19个
原因:org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:向JobManager提交作业超时。 如果JobManager需要更多时间来配置和确认作业提交,则可以增加“ akka.client.timeout”。
在org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:127)
在org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
在org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:92)
在org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
在akka.actor.UntypedActor $$ anonfun $ receive $ 1.applyOrElse(UntypedActor.scala:165)
在akka.actor.Actor $ class.aroundReceive(Actor.scala:502)
在akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
在akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)上
在akka.actor.ActorCell.invoke(ActorCell.scala:495)
在akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
在akka.dispatch.Mailbox.run(Mailbox.scala:224)
在akka.dispatch.Mailbox.exec(Mailbox.scala:234)
在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018年

我们可以看到以粗体显示的stacktrace的一部分,这表明它丢弃了一部分有效负载,我在Internet上进行了研究,并了解了以下解决方案。

我将以下行添加到flink-conf.yml

解决方案:-在flink-conf.yml中增加Akka帧大小

## Akka
akka.framesize:209715200b

参考链接:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/flink-akka-OversizedPayloadException-error-td12218.html

尽管stacktrace还建议增加“ akka.client.timeout”位,但在我的情况下却不起作用。