当运行worker but exception时,我使用了org.springframework.kafka.support.serializer.JsonDeserializer和org.apache.kafka.common.serialization.ByteArrayDeserializer,它们在主控端正常工作,但在worker端出现异常.最后我得到的原因是: java.lang.StackOverflowError: null
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'handler'; defined in: 'batch.configuration.WorkerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@7f811d00']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=] for topic [reply], failedMessage=GenericMessage [payload=StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=, headers={sequenceNumber=1, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, sequenceSize=10, kafka_timestampType=CREATE_TIME, kafka_replyTopic=reply, kafka_receivedTopic=requests, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, kafka_offset=5, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1a6c92e3, kafka_correlationId=[B@ef1f7b2, correlationId=1:workerStep, id=49d312f5-618f-5085-fad1-7868e8e06aa0, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1603368744168, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = requests, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1603368744168, serialized key size = -1, serialized value size = 64, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [49]), RecordHeader(key = sequenceSize, value = [49, 48]), RecordHeader(key = correlationId, value = [49, 58, 119, 111, 114, 107, 101, 114, 83, 116, 101, 112]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 107, 97, 102, 107, 97, 95, 114, 101, 112, 108, 121, 84, 111, 112, 105, 99, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-127, -49, 126, 33, 100, 99, 79, -14, -81, -35, -70, -88, -61, 37, 61, -63])], isReadOnly = false), key = null, value = StepExecutionRequest: [jobExecutionId=1, stepExecutionId=5, stepName=workerStep]), kafka_groupId=repliesGroup, timestamp=1603379041885}]
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'handler'; defined in: 'batch.configuration.WorkerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@7f811d00']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=] for topic [reply], failedMessage=GenericMessage [payload=StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=, headers={sequenceNumber=1, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, sequenceSize=10, kafka_timestampType=CREATE_TIME, kafka_replyTopic=reply, kafka_receivedTopic=requests, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, kafka_offset=5, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1a6c92e3, kafka_correlationId=[B@ef1f7b2, correlationId=1:workerStep, id=49d312f5-618f-5085-fad1-7868e8e06aa0, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1603368744168, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = requests, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1603368744168, serialized key size = -1, serialized value size = 64, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [49]), RecordHeader(key = sequenceSize, value = [49, 48]), RecordHeader(key = correlationId, value = [49, 58, 119, 111, 114, 107, 101, 114, 83, 116, 101, 112]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 107, 97, 102, 107, 97, 95, 114, 101, 112, 108, 121, 84, 111, 112, 105, 99, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-127, -49, 126, 33, 100, 99, 79, -14, -81, -35, -70, -88, -61, 37, 61, -63])], isReadOnly = false), key = null, value = StepExecutionRequest: [jobExecutionId=1, stepExecutionId=5, stepName=workerStep]), kafka_groupId=repliesGroup, timestamp=1603379041885}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
... 10 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'handler'; defined in: 'batch.configuration.WorkerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@7f811d00']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=] for topic [reply]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.工人代码
@SpringBootApplication
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
@ImportResource("context.xml")
public class WorkerConfiguration {
private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
private final JobBuilderFactory jobBuilderFactory;
@Autowired
ProducerFactory<String, String> producerFactory;
@Autowired
public JobExplorer jobExplorer;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public WorkerConfiguration(JobBuilderFactory jobBuilderFactory, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.workerStepBuilderFactory = workerStepBuilderFactory;
}
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
/*
* Configure outbound flow (replies going to the master)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
/*
protected JobRepository createMyJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setTransactionManager(new ResourcelessTransactionManager());
factory.setDataSource(createDataSourceForRepository());
factory.setDatabaseType("HSQL");
return factory.getObject();
}
public DataSource createDataSourceForRepository() {
return DataSourceBuilder.create()
.url("jdbc:hsqldb:file:src/main/resources/hsqldb/batchcore.db;hsqldb.lock_file=false;shutdown=true;")
.driverClassName("org.hsqldb.jdbcDriver")
.username("sa")
.password("")
.build();
}
public DataSource createDataSourceForRepository() {
return DataSourceBuilder.create()
.url("jdbc:postgresql://localhost:5432/bdauser")
.driverClassName("org.postgresql.Driver")
.username("bdauser")
.password("bdauser")
.build();
}
@Bean
public BatchConfigurer batchConfigurer() {
return new DefaultBatchConfigurer(createDataSourceForRepository()) {
@Override
public JobRepository getJobRepository() {
JobRepository jobRepository = null;
try {
jobRepository = createMyJobRepository();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("************************WORKER replying Inside batchConfigurer ****************");
return jobRepository;
}
};
}
*/
@Bean
@ServiceActivator(inputChannel = "replies")
public MessageHandler handler() throws Exception {
System.out.println("************************ worker inside serviceactivator **********************");
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("reply"));
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties(new TopicPartitionOffset("nullChannel", 0));
System.out.println("************************** WORKER ContainerProperties *****************************");
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Primary
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, KafkaMessageListenerContainer<String, String> repliesContainer) {
System.out.println("**************************WORKER replying Template Templet *****************************");
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory,repliesContainer);
replyingKafkaTemplate.setSharedReplyTopic(true);
Duration d = Duration.ofSeconds(50);
replyingKafkaTemplate.setDefaultReplyTimeout(d);
return replyingKafkaTemplate;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
System.out.println("**************** worker node ConcurrentMessageListenerContainer *****************************");
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("requests");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public IntegrationFlow serverGateway(ConcurrentMessageListenerContainer<String, String> container, KafkaTemplate<String, String> template) {
return IntegrationFlows
.from(Kafka.inboundGateway(container, template)
.replyTimeout(3000000))
.channel(requests())
.get();
}
@Bean
public Job remotePartitioningJob() {
System.out.println("******************* inside remotePartitioningJob **************************");
return this.jobBuilderFactory.get("remotePartitioningJobMy")
.incrementer(new RunIdIncrementer())
.start(workerStep())
.build();
}
/*
* Configure the worker step
*/
@Bean
public Step workerStep() {
System.out.println("******************* inside worker step **************************");
return this.workerStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.outputChannel(replies())
.tasklet(tasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
return (contribution, chunkContext) -> {
System.out.println("processing " + partition);
return RepeatStatus.FINISHED;
};
}
@Bean
public PartitionHandler partitionHandler() throws Exception {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("slaveStep");
partitionHandler.setGridSize(10);
partitionHandler.setMessagingOperations(messageTemplate());
partitionHandler.setPollInterval(5000l);
partitionHandler.setJobExplorer(this.jobExplorer);
partitionHandler.afterPropertiesSet();
return partitionHandler;
}
@Bean
public MessagingTemplate messageTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
@Bean
public ExecutorChannel outboundRequests() {
return MessageChannels.executor("outboundRequests", new SimpleAsyncTaskExecutor()).get();
}
public static void main(String[] args) {
System.out.println("************************WORKER CONFIGURATION ****************");
SpringApplication.run(WorkerConfiguration.class, args);
}
}属性文件
spring.main.allow-bean-definition-overriding=true
spring.kafka.bootstrap-servers=cdh5161-e2e-test-1.eaas.amdocs.com:9092,cdh5161-e2e-test-2.eaas.amdocs.com:9092,cdh5161-e2e-test-3.eaas.amdocs.com:9092,cdh5161-e2e-test-4.eaas.amdocs.com:9092,cdh5161-e2e-test-5.eaas.amdocs.com:9092,cdh5161-e2e-test-6.eaas.amdocs.com:9092,cdh5161-e2e-test-7.eaas.amdocs.com:9092,cdh5161-e2e-test-8.eaas.amdocs.com:9092
spring.kafka.consumer.group-id=remotePartitioningConsuerGroup
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.properties.spring.json.trusted.packages=*
server.port=8050
## PostgreSQL
spring.datasource.url=jdbc:postgresql://localhost:5432/bdauser
spring.datasource.username=bdauser
spring.datasource.password=bdauser
#drop n create table again, good for testing, comment this in production
spring.jpa.hibernate.ddl-auto=create发布于 2021-09-16 11:13:41
因为你使用的是json.StepExecution类和JobExecution类有双向的关系。因此,在尝试序列化要发送回应答通道的StepExecution对象时,它会导致无限递归。
https://stackoverflow.com/questions/64485624
复制相似问题