状态机

最后发布时间:2026-02-04 11:54:21 浏览量:

AnalysisEvent.java

package com.bioproj.analysis.statemachine;

public enum AnalysisEvent {

    // 运行分析数据
    RUN_ANALYSIS_DATA,

    // 分析结束
    ANALYSIS_END
}

AnalysisPersist.java

package com.bioproj.analysis.statemachine;

import com.bioproj.analysis.entity.SeqSampleAnalysis;
import com.mbiolance.common.domain.SampleStatus;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.support.DefaultStateMachineContext;

public class AnalysisPersist implements StateMachinePersist<SampleStatus, AnalysisEvent, SeqSampleAnalysis> {

    @Override
    public void write(StateMachineContext<SampleStatus, AnalysisEvent> stateMachineContext, SeqSampleAnalysis seqSampleAnalysis) throws Exception {
    }

    @Override
    public StateMachineContext<SampleStatus, AnalysisEvent> read(SeqSampleAnalysis seqSampleAnalysis) throws Exception {
        return new DefaultStateMachineContext<>(seqSampleAnalysis.getStatus(),null,null,null);
    }

}

AnalysisPersistConfig.java

package com.bioproj.analysis.statemachine;

import com.bioproj.analysis.entity.SeqSampleAnalysis;
import com.mbiolance.common.domain.SampleStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.persist.DefaultStateMachinePersister;
import org.springframework.statemachine.persist.StateMachinePersister;

@Configuration
public class AnalysisPersistConfig {

    @Bean
    public AnalysisPersist reportPersist() {
        return new AnalysisPersist();
    }

    @Bean
    public StateMachinePersister<SampleStatus, AnalysisEvent, SeqSampleAnalysis> stateMachinePersister() {
        return new DefaultStateMachinePersister<>(reportPersist());
    }

}

AnalysisStateMachineListener.java

package com.bioproj.analysis.statemachine;

import com.mbiolance.common.domain.SampleStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.listener.StateMachineListenerAdapter;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.transition.Transition;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class AnalysisStateMachineListener extends StateMachineListenerAdapter<SampleStatus, AnalysisEvent> {

    @Override
    public void transition(Transition<SampleStatus, AnalysisEvent> transition) {
        SampleStatus targetStatus = null;
        SampleStatus sourceStatus = null;
        State<SampleStatus, AnalysisEvent> target = transition.getTarget();
        State<SampleStatus, AnalysisEvent> source = transition.getSource();

        if (target != null) {
            targetStatus = target.getId();
        }
        if (source != null) {
            sourceStatus = source.getId();
        }

        log.info("分析状态变更,原状态:{},目标状态:{}", sourceStatus, targetStatus);

    }

    @Override
    public void stateMachineError(StateMachine<SampleStatus, AnalysisEvent> stateMachine, Exception exception) {

        System.out.println();
    }

}

AnalysisStatusMachineConfig.java

package com.bioproj.analysis.statemachine;

import com.mbiolance.common.domain.SampleStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.config.EnableStateMachine;
import org.springframework.statemachine.config.StateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;

import java.util.EnumSet;

@Configuration
@EnableStateMachine
public class AnalysisStatusMachineConfig extends StateMachineConfigurerAdapter<SampleStatus, AnalysisEvent> {

    @Autowired
    private AnalysisStateMachineListener listener;

    @Override
    public void configure(StateMachineConfigurationConfigurer<SampleStatus, AnalysisEvent> config) throws Exception {
        config.withConfiguration().listener(listener);
    }

    @Override
    public void configure(StateMachineStateConfigurer<SampleStatus, AnalysisEvent> states) throws Exception {
        states.withStates().initial(SampleStatus.CREATE)
                .end(SampleStatus.ANALYSIS_END)
                .states(EnumSet.allOf(SampleStatus.class));
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<SampleStatus, AnalysisEvent> transitions) throws Exception {
        transitions.withExternal().source(SampleStatus.CREATE).target(SampleStatus.ANALYSIS_DOING)
                .event(AnalysisEvent.RUN_ANALYSIS_DATA).and()
                .withExternal().source(SampleStatus.ANALYSIS_DOING).target(SampleStatus.ANALYSIS_END)
                .event(AnalysisEvent.ANALYSIS_END);
    }

}

SeqSampleAnalysisStatusService.java

package com.bioproj.analysis.statemachine;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.bioproj.analysis.aop.RedisLock;
import com.bioproj.analysis.entity.SeqSampleAnalysis;
import com.bioproj.analysis.observer.AnalysisObserverHandles;
import com.bioproj.analysis.repository.SeqSampleAnalysisRepository;
import com.bioproj.analysis.service.AbstractSeqSampleAnalysisService;
import com.bioproj.controller.SendKafkaController;
import com.bioproj.domain.vo.SendKafkaVo;
import com.bioproj.pojo.task.Workflow;
import com.bioproj.service.IWorkflowService;
import com.mbiolance.cloud.auth.common.SysUserInfoContext;
import com.mbiolance.cloud.auth.common.SystemRuntimeException;
import com.mbiolance.cloud.sample.rpc.detection.SeqProjectSampleFeignService;
import com.mbiolance.common.domain.AnalysisRunMode;
import com.mbiolance.common.domain.SampleStatus;
import com.mbiolance.common.domain.SeqSampleAnalysisDto;
import com.mbiolance.common.platform.SeqSampleDataDto;
import com.mbiolance.common.rpc.CommonReportFeignService;
import com.mbiolance.common.utils.NumberGenerate;
import com.mbiolance.common.utils.SJBeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.annotation.OnTransition;
import org.springframework.statemachine.annotation.WithStateMachine;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;

@Slf4j
@Component
@WithStateMachine
public class SeqSampleAnalysisStatusService extends AbstractSeqSampleAnalysisService<SeqSampleAnalysis> {

    @Autowired
    private StateMachine<SampleStatus, AnalysisEvent> stateMachine;

    @Autowired
    private StateMachinePersister<SampleStatus, AnalysisEvent, SeqSampleAnalysis> stateMachinePersister;

    @Autowired
    private CommonReportFeignService commonReportFeignService;

    @Autowired
    private SeqSampleAnalysisRepository seqSampleAnalysisRepository;

    @Autowired
    private AnalysisObserverHandles analysisObserverHandles;

    @Autowired
    private SendKafkaController sendKafkaController;

    @OnTransition(source = "CREATE", target = "ANALYSIS_DOING")
    public boolean runAnalysisDataTransition(Message<SeqSampleAnalysis> message) {
        SeqSampleAnalysis seqSampleAnalysis = (SeqSampleAnalysis) message.getHeaders().get("seqSampleAnalysis");
        if (seqSampleAnalysis == null || seqSampleAnalysis.getId() == null) {
            return false;
        }
        Map<?, ?> objs = this.getObjs(message);
        String clusterId = ObjectUtil.isEmpty(objs.get("clusterId")) ? null : String.valueOf(objs.get("clusterId"));
        try {
            log.info("添加到kafka:{},cluster:{}", seqSampleAnalysis.getAnalysisNumber(), clusterId);
            sendKafkaController.runAnalysisData(SendKafkaVo.builder()
                    .workflowId(seqSampleAnalysis.getSxProcessId())
                    .analysisNumber(seqSampleAnalysis.getAnalysisNumber())
                    .experimentNumber(seqSampleAnalysis.getExperimentNumber())
                    .sampleNumber(seqSampleAnalysis.getSampleNumber())
                    .fastq1(seqSampleAnalysis.getFastq1())
                    .fastq2(seqSampleAnalysis.getFastq2())
                    .build());
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        SeqSampleAnalysis analysis = seqSampleAnalysisRepository.findById(seqSampleAnalysis.getId()).orElseThrow(() -> new SystemRuntimeException("数据不存在"));
        analysis.setStatus(SampleStatus.ANALYSIS_DOING);
        analysis.setUpdateTime(new Date());
        seqSampleAnalysisRepository.save(analysis);
        analysisObserverHandles.notifyRunAnalysisData(analysis, SysUserInfoContext.getUser());
        return true;
    }

    @OnTransition(source = "ANALYSIS_DOING", target = "ANALYSIS_END")
    public boolean analysisEndTransition(Message<SeqSampleAnalysis> message) {
        SeqSampleAnalysis seqSampleAnalysis = (SeqSampleAnalysis) message.getHeaders().get("seqSampleAnalysis");
        if (seqSampleAnalysis == null || seqSampleAnalysis.getId() == null) {
            return false;
        }
        SampleStatus originStatus = seqSampleAnalysis.getStatus();
        Map<?, ?> objs = this.getObjs(message);
        String templateNumber = ObjectUtil.isEmpty(objs.get("templateNumber")) ? null : String.valueOf(objs.get("templateNumber"));
        SeqSampleAnalysisDto seqSampleAnalysisDto = BeanUtil.copyProperties(seqSampleAnalysis, SeqSampleAnalysisDto.class);
        commonReportFeignService.createReport(seqSampleAnalysisDto, templateNumber);
        seqSampleAnalysis.setStatus(SampleStatus.ANALYSIS_END);
        seqSampleAnalysis.setUpdateTime(new Date());
        seqSampleAnalysisRepository.saveAndFlush(seqSampleAnalysis);
        analysisObserverHandles.notifyAnalysisEnd(seqSampleAnalysis, originStatus, SysUserInfoContext.getUser());
        return true;
    }

    private Map<?, ?> getObjs(Message<SeqSampleAnalysis> message) {
        Object obj = message.getHeaders().get("objs");
        return Convert.convert(Map.class, obj);
    }

    @RedisLock(value = "'analysisEvent_' +  #seqSampleAnalysis.analysisNumber", isBlocked = false)
    public boolean sendEvent0(SeqSampleAnalysis seqSampleAnalysis, AnalysisEvent event, Map<String, Object> objs) {
        Message<AnalysisEvent> message = MessageBuilder.withPayload(event)
                .setHeader("seqSampleAnalysis", seqSampleAnalysis)
                .setHeader("objs", objs).build();
        boolean result = false;
        try {
            stateMachine.start();
            stateMachinePersister.restore(stateMachine, seqSampleAnalysis);
            result = stateMachine.sendEvent(message);
            stateMachinePersister.persist(stateMachine, seqSampleAnalysis);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (Objects.nonNull(seqSampleAnalysis) && Objects.equals(seqSampleAnalysis.getStatus(), SampleStatus.ANALYSIS_END)) {
                stateMachine.stop();
            }
        }
        return result;
    }

}

快捷入口
java 思维导图 浏览PDF 下载PDF
分享到:
标签