首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >与星火DataSource API V2蜂巢串流水槽发生故障的抵消

与星火DataSource API V2蜂巢串流水槽发生故障的抵消
EN

Stack Overflow用户
提问于 2018-05-23 16:45:13
回答 1查看 638关注 0票数 1

我使用接收器将Spark2.3结构化流DataFrame保存到带有自定义接收器实现的Hive表中。

代码如下。

代码语言:javascript
复制
val df = spark.readStream.format("socket").option("host", "localhost").option("port", 19191).load().as[String]


val query = df.map { s => val records = s.split(",") assert(records.length >= 4)
        (records(0).toInt, records(1), records(2), records(3))
     }


query.selectExpr("_1 as eid", "_2 as name", "_3 as salary", "_4 as designation").
      writeStream.
      format("hive-streaming").
      option("metastore", ".....").
      option("db", "test").
      option("table", "test_employee").
      option("checkpointLocation", "/checkpoints/employee/checkpoint").
      queryName("socket-hive-streaming").
      start()

这将导致以下运行时错误。

代码语言:javascript
复制
ERROR streaming.MicroBatchExecution: Query socket-hive-streaming [id =  ......, runId = ......] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 1 followed by 0
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:146)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:356)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:355)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:355)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-14 09:24:31

解决问题的方法有两种:

  1. 删除/清除您的检查点:计算机上的/checkpoints/employee/checkpoint
  2. 使用维护偏移量的另一个源,如Kafka

遇到此问题的原因是套接字没有维护偏移信息

当您重新启动从socket 9999接收输入数据的作业时,您的作业所做的第一件事就是尝试从/checkpoints/employee/checkpoint恢复状态,它会发现您当前记录的偏移量是1。然后将其他消息输入到socket 9999中,您的作业会发现socket 9999的偏移量是0。所以它抛出了这个异常。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50493552

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档