首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我在使用readStream().format("kafka")时遇到了一个问题

我在使用readStream().format("kafka")时遇到了一个问题
EN

Stack Overflow用户
提问于 2020-04-09 23:57:35
回答 1查看 110关注 0票数 0

帮助请修复此错误:

代码语言:javascript
复制
20/04/09 18:38:44 ERROR MicroBatchExecution: Query [id = 9f3cbbf6-85a8-4aed-89c6-f5d3ff9c40fa, runId = 73c071c6-e222-4760-a750-393666a298af] terminated with error
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
20/04/09 18:38:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

我的代码是:

代码语言:javascript
复制
   SQLContext sqlContext = new SQLContext(HadoopWork.sc);
    sqlContext
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
            .option("subscribe", IKafkaConstants.TOPIC_NAME)
            .option("startingOffsets",  "earliest")
            .option("enable.auto.commit", false)
            .option("checkpointLocation", "/tmp/checkpoint/1")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
            .writeStream()
            .format("parquet")
            .outputMode("append")
            .option("checkpointLocation", "/tmp/checkpoint/2")
            .option("compression", "gzip")
            .option("path", "/user/test/")
            .option("enable.auto.commit", false)
            .option("startingOffsets",  "earliest")
            .start();
EN

回答 1

Stack Overflow用户

发布于 2020-04-10 00:02:26

它似乎是结构化流媒体中一个更老的已知bug

升级到Spark 2.3.2+应该可以帮你解决这个问题

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61125231

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档