我已经编写了一个消费者,它读取卡夫卡主题,并使用StreamSink以拼图格式写入数据。但是我得到了以下错误
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence
at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)代码很简单:
DataStream<GenericRecord> sourceStream = env.addSource(bikeDetailsKafkaConsumer010;
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
sourceStream.addSink(sink).setParallelism(parallelism);是不是某些记录有问题,以及如何调试哪个记录出现异常。
发布于 2020-03-02 18:11:17
如果你在编写Parquet/Avro时得到一个ClassCastException,那么模式和数据几乎总是不匹配的。
我建议打印出预期的模式和格式化为json的实际记录(使用toString,甚至更好,使用类型信息)。
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos);
writer.write(genericRecord, encoder);
encoder.flush();
baos.flush();
return new String(baos.toByteArray());https://stackoverflow.com/questions/60458366
复制相似问题