AnalysisEvent.java
package com.bioproj.analysis.statemachine; public enum AnalysisEvent { // 运行分析数据 RUN_ANALYSIS_DATA, // 分析结束 ANALYSIS_END }
AnalysisPersist.java
package com.bioproj.analysis.statemachine; import com.bioproj.analysis.entity.SeqSampleAnalysis; import com.mbiolance.common.domain.SampleStatus; import org.springframework.statemachine.StateMachineContext; import org.springframework.statemachine.StateMachinePersist; import org.springframework.statemachine.support.DefaultStateMachineContext; public class AnalysisPersist implements StateMachinePersist<SampleStatus, AnalysisEvent, SeqSampleAnalysis> { @Override public void write(StateMachineContext<SampleStatus, AnalysisEvent> stateMachineContext, SeqSampleAnalysis seqSampleAnalysis) throws Exception { } @Override public StateMachineContext<SampleStatus, AnalysisEvent> read(SeqSampleAnalysis seqSampleAnalysis) throws Exception { return new DefaultStateMachineContext<>(seqSampleAnalysis.getStatus(),null,null,null); } }
AnalysisPersistConfig.java
package com.bioproj.analysis.statemachine; import com.bioproj.analysis.entity.SeqSampleAnalysis; import com.mbiolance.common.domain.SampleStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.statemachine.persist.DefaultStateMachinePersister; import org.springframework.statemachine.persist.StateMachinePersister; @Configuration public class AnalysisPersistConfig { @Bean public AnalysisPersist reportPersist() { return new AnalysisPersist(); } @Bean public StateMachinePersister<SampleStatus, AnalysisEvent, SeqSampleAnalysis> stateMachinePersister() { return new DefaultStateMachinePersister<>(reportPersist()); } }
AnalysisStateMachineListener.java
package com.bioproj.analysis.statemachine; import com.mbiolance.common.domain.SampleStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.statemachine.StateMachine; import org.springframework.statemachine.listener.StateMachineListenerAdapter; import org.springframework.statemachine.state.State; import org.springframework.statemachine.transition.Transition; import org.springframework.stereotype.Component; @Slf4j @Component public class AnalysisStateMachineListener extends StateMachineListenerAdapter<SampleStatus, AnalysisEvent> { @Override public void transition(Transition<SampleStatus, AnalysisEvent> transition) { SampleStatus targetStatus = null; SampleStatus sourceStatus = null; State<SampleStatus, AnalysisEvent> target = transition.getTarget(); State<SampleStatus, AnalysisEvent> source = transition.getSource(); if (target != null) { targetStatus = target.getId(); } if (source != null) { sourceStatus = source.getId(); } log.info("分析状态变更,原状态:{},目标状态:{}", sourceStatus, targetStatus); } @Override public void stateMachineError(StateMachine<SampleStatus, AnalysisEvent> stateMachine, Exception exception) { System.out.println(); } }
AnalysisStatusMachineConfig.java
package com.bioproj.analysis.statemachine; import com.mbiolance.common.domain.SampleStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.statemachine.config.EnableStateMachine; import org.springframework.statemachine.config.StateMachineConfigurerAdapter; import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer; import org.springframework.statemachine.config.builders.StateMachineStateConfigurer; import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer; import java.util.EnumSet; @Configuration @EnableStateMachine public class AnalysisStatusMachineConfig extends StateMachineConfigurerAdapter<SampleStatus, AnalysisEvent> { @Autowired private AnalysisStateMachineListener listener; @Override public void configure(StateMachineConfigurationConfigurer<SampleStatus, AnalysisEvent> config) throws Exception { config.withConfiguration().listener(listener); } @Override public void configure(StateMachineStateConfigurer<SampleStatus, AnalysisEvent> states) throws Exception { states.withStates().initial(SampleStatus.CREATE) .end(SampleStatus.ANALYSIS_END) .states(EnumSet.allOf(SampleStatus.class)); } @Override public void configure(StateMachineTransitionConfigurer<SampleStatus, AnalysisEvent> transitions) throws Exception { transitions.withExternal().source(SampleStatus.CREATE).target(SampleStatus.ANALYSIS_DOING) .event(AnalysisEvent.RUN_ANALYSIS_DATA).and() .withExternal().source(SampleStatus.ANALYSIS_DOING).target(SampleStatus.ANALYSIS_END) .event(AnalysisEvent.ANALYSIS_END); } }
SeqSampleAnalysisStatusService.java
package com.bioproj.analysis.statemachine; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; import cn.hutool.extra.spring.SpringUtil; import com.bioproj.analysis.aop.RedisLock; import com.bioproj.analysis.entity.SeqSampleAnalysis; import com.bioproj.analysis.observer.AnalysisObserverHandles; import com.bioproj.analysis.repository.SeqSampleAnalysisRepository; import com.bioproj.analysis.service.AbstractSeqSampleAnalysisService; import com.bioproj.controller.SendKafkaController; import com.bioproj.domain.vo.SendKafkaVo; import com.bioproj.pojo.task.Workflow; import com.bioproj.service.IWorkflowService; import com.mbiolance.cloud.auth.common.SysUserInfoContext; import com.mbiolance.cloud.auth.common.SystemRuntimeException; import com.mbiolance.cloud.sample.rpc.detection.SeqProjectSampleFeignService; import com.mbiolance.common.domain.AnalysisRunMode; import com.mbiolance.common.domain.SampleStatus; import com.mbiolance.common.domain.SeqSampleAnalysisDto; import com.mbiolance.common.platform.SeqSampleDataDto; import com.mbiolance.common.rpc.CommonReportFeignService; import com.mbiolance.common.utils.NumberGenerate; import com.mbiolance.common.utils.SJBeanUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.statemachine.StateMachine; import org.springframework.statemachine.annotation.OnTransition; import org.springframework.statemachine.annotation.WithStateMachine; import org.springframework.statemachine.persist.StateMachinePersister; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.*; @Slf4j @Component @WithStateMachine public class SeqSampleAnalysisStatusService extends AbstractSeqSampleAnalysisService<SeqSampleAnalysis> { @Autowired private StateMachine<SampleStatus, AnalysisEvent> stateMachine; @Autowired private StateMachinePersister<SampleStatus, AnalysisEvent, SeqSampleAnalysis> stateMachinePersister; @Autowired private CommonReportFeignService commonReportFeignService; @Autowired private SeqSampleAnalysisRepository seqSampleAnalysisRepository; @Autowired private AnalysisObserverHandles analysisObserverHandles; @Autowired private SendKafkaController sendKafkaController; @OnTransition(source = "CREATE", target = "ANALYSIS_DOING") public boolean runAnalysisDataTransition(Message<SeqSampleAnalysis> message) { SeqSampleAnalysis seqSampleAnalysis = (SeqSampleAnalysis) message.getHeaders().get("seqSampleAnalysis"); if (seqSampleAnalysis == null || seqSampleAnalysis.getId() == null) { return false; } Map<?, ?> objs = this.getObjs(message); String clusterId = ObjectUtil.isEmpty(objs.get("clusterId")) ? null : String.valueOf(objs.get("clusterId")); try { log.info("添加到kafka:{},cluster:{}", seqSampleAnalysis.getAnalysisNumber(), clusterId); sendKafkaController.runAnalysisData(SendKafkaVo.builder() .workflowId(seqSampleAnalysis.getSxProcessId()) .analysisNumber(seqSampleAnalysis.getAnalysisNumber()) .experimentNumber(seqSampleAnalysis.getExperimentNumber()) .sampleNumber(seqSampleAnalysis.getSampleNumber()) .fastq1(seqSampleAnalysis.getFastq1()) .fastq2(seqSampleAnalysis.getFastq2()) .build()); } catch (Exception e) { e.printStackTrace(); return false; } SeqSampleAnalysis analysis = seqSampleAnalysisRepository.findById(seqSampleAnalysis.getId()).orElseThrow(() -> new SystemRuntimeException("数据不存在")); analysis.setStatus(SampleStatus.ANALYSIS_DOING); analysis.setUpdateTime(new Date()); seqSampleAnalysisRepository.save(analysis); analysisObserverHandles.notifyRunAnalysisData(analysis, SysUserInfoContext.getUser()); return true; } @OnTransition(source = "ANALYSIS_DOING", target = "ANALYSIS_END") public boolean analysisEndTransition(Message<SeqSampleAnalysis> message) { SeqSampleAnalysis seqSampleAnalysis = (SeqSampleAnalysis) message.getHeaders().get("seqSampleAnalysis"); if (seqSampleAnalysis == null || seqSampleAnalysis.getId() == null) { return false; } SampleStatus originStatus = seqSampleAnalysis.getStatus(); Map<?, ?> objs = this.getObjs(message); String templateNumber = ObjectUtil.isEmpty(objs.get("templateNumber")) ? null : String.valueOf(objs.get("templateNumber")); SeqSampleAnalysisDto seqSampleAnalysisDto = BeanUtil.copyProperties(seqSampleAnalysis, SeqSampleAnalysisDto.class); commonReportFeignService.createReport(seqSampleAnalysisDto, templateNumber); seqSampleAnalysis.setStatus(SampleStatus.ANALYSIS_END); seqSampleAnalysis.setUpdateTime(new Date()); seqSampleAnalysisRepository.saveAndFlush(seqSampleAnalysis); analysisObserverHandles.notifyAnalysisEnd(seqSampleAnalysis, originStatus, SysUserInfoContext.getUser()); return true; } private Map<?, ?> getObjs(Message<SeqSampleAnalysis> message) { Object obj = message.getHeaders().get("objs"); return Convert.convert(Map.class, obj); } @RedisLock(value = "'analysisEvent_' + #seqSampleAnalysis.analysisNumber", isBlocked = false) public boolean sendEvent0(SeqSampleAnalysis seqSampleAnalysis, AnalysisEvent event, Map<String, Object> objs) { Message<AnalysisEvent> message = MessageBuilder.withPayload(event) .setHeader("seqSampleAnalysis", seqSampleAnalysis) .setHeader("objs", objs).build(); boolean result = false; try { stateMachine.start(); stateMachinePersister.restore(stateMachine, seqSampleAnalysis); result = stateMachine.sendEvent(message); stateMachinePersister.persist(stateMachine, seqSampleAnalysis); } catch (Exception e) { e.printStackTrace(); } finally { if (Objects.nonNull(seqSampleAnalysis) && Objects.equals(seqSampleAnalysis.getStatus(), SampleStatus.ANALYSIS_END)) { stateMachine.stop(); } } return result; } }
Github开源生信云平台 DEMO