首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark Cloudfiles Autoloader BlobStorage Azure java.io.IOException:尝试从关闭的流中读取

Spark Cloudfiles Autoloader BlobStorage Azure java.io.IOException:尝试从关闭的流中读取
EN

Stack Overflow用户
提问于 2021-01-06 18:35:31
回答 1查看 148关注 0票数 0

我学会了在SPARK 3上使用新的自动加载器流媒体方法,我遇到了这个问题。在这里,我试图监听简单的json文件,但是我的流永远不会启动。

我的代码(删除凭证):

代码语言:javascript
复制
from pyspark.sql.types import StructType, StringType, IntegerType

azdls_connection_string = "My connection string"
queue_name = "queu-name"

stream_schema = StructType() \
  .add("timestamp", StringType(), False) \
  .add("temperature", IntegerType(), False)

ressource_group = ""
cloudfiles_subid_telem = ""
cloudfiles_clientid_telem = ""
cloudfiles_clientsecret_telem = ""
tenantid = ""
conainer_name = "mydb"
abs_fs = "abfss://" + conainer_name + "@" + dls_name + ".dfs.core.windows.net"

read_stream = (
  spark.readStream.format("cloudFiles")
  .option("cloudFiles.useNotifications", True)
  .option("cloudFiles.format", "json")
  .option("cloudFiles.connectionString", azdls_connection_string)
  .option("cloudFiles.resourceGroup", ressource_group)
  .option("cloudFiles.subscriptionId", cloudfiles_subid_telem)
  .option("cloudFiles.tenantId", tenantid)
  .option("cloudFiles.clientId", cloudfiles_clientid_telem)
  .option("cloudFiles.clientSecret", cloudfiles_clientsecret_telem)
  .option("cloudFiles.region", "francecentral")
  .schema(stream_schema)
  .option("cloudFiles.includeExistingFiles", False)
  .load(abs_fs + "/input")
)

checkpoint_path = abs_fs + "/checkpoints"
out_path = abs_fs + "/out"

df = read_stream.writeStream.format("delta") \
  .option("checkpointLocation", checkpoint_path) \
  .start(out_path)

当我尝试开始我的流媒体时,我得到了一个错误。我的权限设置正确,因为我的Azure队列已创建。我在databricks网站上的autloader文档中找不到任何关于此错误的信息。

下面是我的错误:

代码语言:javascript
复制
java.io.IOException: Attempted read from closed stream.
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:165)
    at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2001)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1980)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:1957)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:1907)
    at org.apache.commons.io.IOUtils.toString(IOUtils.java:778)
    at org.apache.commons.io.IOUtils.toString(IOUtils.java:759)
    at com.databricks.sql.aqs.EventGridClient.prettyResponse(EventGridClient.scala:428)
    at com.databricks.sql.aqs.EventGridClient.com$databricks$sql$aqs$EventGridClient$$errorResponse(EventGridClient.scala:424)
    at com.databricks.sql.aqs.EventGridClient$$anonfun$createEventSubscription$3.applyOrElse(EventGridClient.scala:235)
    at com.databricks.sql.aqs.EventGridClient$$anonfun$createEventSubscription$3.applyOrElse(EventGridClient.scala:229)
    at com.databricks.sql.aqs.EventGridClient.executeRequest(EventGridClient.scala:387)
    at com.databricks.sql.aqs.EventGridClient.createEventSubscription(EventGridClient.scala:226)
    at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.$anonfun$setupEventGridSubscription$1(AzureEventNotificationSetup.scala:135)
    at scala.Option.getOrElse(Option.scala:189)
    at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.setupEventGridSubscription(AzureEventNotificationSetup.scala:121)
    at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.<init>(AzureEventNotificationSetup.scala:75)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.$anonfun$create$1(EventNotificationSetup.scala:66)
    at com.databricks.sql.fileNotification.autoIngest.ResourceManagementUtils$.unwrapInvocationTargetException(ResourceManagementUtils.scala:42)
    at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.create(EventNotificationSetup.scala:50)
    at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.$anonfun$createSource$2(CloudFilesSourceProvider.scala:162)
    at scala.Option.getOrElse(Option.scala:189)
    at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.createSource(CloudFilesSourceProvider.scala:154)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:306)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:93)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:90)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:88)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:322)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:322)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:166)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:311)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:88)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:269)

提前谢谢你

EN

回答 1

Stack Overflow用户

发布于 2021-01-06 21:34:18

好了,我找到我的问题了。当我尝试abqs连接器时,我有一个与前一个流的冲突。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65594160

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档