标签归档

CompletableFuture 结合 java21 虚线程

try( ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor() ) {
            List<CompletableFuture<Void>> f = list.stream().map(item ->
                    CompletableFuture.runAsync(() -> {
                        try {
                            processOrder(item);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }, executorService)
            ).toList();
            CompletableFuture.allOf(f.toArray(new CompletableFuture[0])).join();
}

使用 Executors.newVirtualThreadPerTaskExecutor() 作为 CompletableFuture.runAsync的 Executor 即可。对于老代码无痛更改

rocketMQ-封装库 使用

一,引入本地依赖, 注意自己的电脑是否配置有本地库,没配置问一下别人

     <!--<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.0</version>
     </dependency>-->  这个就不要了,关于 rocketmq的一切依赖都可以删掉,只需要保留以下本地库

        <dependency>
            <groupId>org.dxstudio</groupId>
            <artifactId>dx-rocketmq-starter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

二,配置文件增加以下内容

environment: local  #这里各个环境要使用不同的环境标记,叫什么不重要,目前统一 local | dev | pron | prod
rocketmq:
  consumer:
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10
  producer:
    group: dx-dump-group
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 3000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false
  name-server: 192.168.222.10:9876

三,main上添加注解

为了自动注入三方库中的类。因为在配置中做了 注解拦截和修改,用于自动隔离环境,自动更改你的侦听器里的group和topic

@MapperScan("org.dxstudio.dump.core.mapper")
@EnableAsync
@SpringBootApplication
@ComponentScan("org.dxstudio.*")  //加上这个
public class DumpApplication {

    public static void main(String[] args) {
        SpringApplication.run(DumpApplication.class, args);
    }

}

四,发布消息

已经使用项目组的规则做了封装,为保持统一,虽然没有二次封装 rocketMQTemplate,也请使用封装好的 topic 和message 来

@Autowired
RocketMQTemplate rocketMQTemplate;

private SendResult sendMsg(int taskId ){
        //第一个参数是标签,我们约定为事件类型,
        //第二个参数是消息的key(用于在mq后台查询),这里如果是订单 直接订单号,其他使用ID即可
        //第三个参数是你要发送的消息体,范型,爱传什么传什么
        DxMQTpl<Integer> t = new DxMQTpl<>("task", String.valueOf( taskId ), taskId);

        //没有再二次封装 rocketMQTemplate, 所以可以网上搜索这个的发送方法即可,没有特别情况一般就用 syncSend
        SendResult result = rocketMQTemplate.syncSend( t.topic(), t.message() );
        log.info("发送消息 {}", result) ;
        return result;

}

五,消费消息

消费消息已经封装好了处理方法,继承DxMQMessageHandler<T>、并实现 RocketMQListener< DxMQMessage<T> > 即可。

@RocketMQMessageListener 这个是 rocketMQTemplate 的注释,可网上查资料,一般就用我下面的即可,改成自己要订阅的频道

@Slf4j
@RocketMQMessageListener(consumerGroup = "dx-dump-group",
        topic = "dx-dump",
        selectorExpression = "task",
        selectorType = SelectorType.TAG,
        consumeThreadMax = 1,
        consumeThreadNumber = 1)
public class Consumer extends DxMQMessageHandler<Long> implements RocketMQListener< DxMQMessage<Long> > {
    @Autowired
    TaskScanService taskScanService;
    
    //这里固定写 执行超类方法即可
    @Override
    public void onMessage( DxMQMessage<Long> msg) {
        super.dispatchMessage( msg );
    }
    
    //这里是自己真正要处理的逻辑,有问题毫无犹豫抛出异常即可
    @Override
    protected void handleMessage(DxMQMessage<Long> message) throws Exception {
        log.info("处理消息 {}", message);
//        throw new RuntimeException("测试一下异常");
    }
    
    //这里是异常时要做的事情,比如标记异常。 
    //这里必须是重试都失败了才会最后触发,比如关闭 isRetry 和 throwException。 throwException开启会触发rocketmq的自动重发
    @Override
    protected void handleMaxRetriesExceeded(DxMQMessage<Long> message) {
        log.error("消费失败,后续处理,比如标记异常");
    }
    

    //过滤方法,返回true不会执行handleMessage。 用于限制重复的消息消费(rocketmq不会保证不重复),比如订单已经处理过了,在这里个方法里检查好,就不应该再继续处理
    @Override
    protected boolean filter(DxMQMessage<Long> message) {
        return super.filter(message);
    }
    
    //如果要自己修改重试规则(mq的重试是一直会有),开启true,则每次发生异常,是由代码控制 重新发消息。这样自己可以控制重试多少次(5秒间隔),达到最大次数后触发handleMaxRetriesExceeded
    //这个要看自己的业务,允许异常的业务 用这个,不允许异常的,使用mq的重试
    @Override
    protected boolean isRetry() {
        return true;
    }

    @Override
    protected boolean throwException() {
        return true;
    }
}

六,附上rocketmq的重发机制

springboot+modelMapper+mybatis的枚举和JSON处理

一,依赖

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.48</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.5</version>
</dependency>
<dependency>
<groupId>org.modelmapper</groupId>
<artifactId>modelmapper</artifactId>
<version>3.2.0</version>

二,枚举使用

(1)定义枚举

public enum StoreTypeEnum {

    OSS(0,"阿里OSS服务"),
    S3(1,"亚马逊S3服务");

    StoreTypeEnum( int code, String des ){
        this.code = code;
        this.des = des;
    }

    @EnumValue
    private final int code;

    private final String des; //有jsonvalue注释,转换值为des,否则为枚举名

}

(2)接收参数中使用和验证枚举

@Data
public class StoreConfigParam {

    @NotEmpty(message = "键名不能为空")
    private String storeKey;

    @NotEmpty(message = "描述不能为空")
    private String des;

    @NotNull(message = "配置的类型不能为空")
    private StoreTypeEnum storeType;

    private JSONObject storeMeta;

}

(3)数据库实体类中定义枚举

@TableName(autoResultMap = true)
@Data
public class StoreConfig {

    @TableId
    private String storeKey;
    private String des;
    private StoreTypeEnum storeType;

    @TableField( typeHandler = Fastjson2TypeHandler.class)
    private JSONObject storeMeta;

}

(4)使用枚举

public class StoreConfigHttp {

    @Autowired
    private StoreConfigMapper storeConfigMapper;

    @Autowired
    private ModelMapper modelMapper;

    @PostMapping("/store-config/add")
    public ResultRes add(@Validated @RequestBody StoreConfigParam param){
        StoreConfig storeConfig = modelMapper.map( param, StoreConfig.class );
        storeConfigMapper.insert( storeConfig );
        return ResultRes.success();

    }

    @PostMapping("/store-config/update")
    public ResultRes update(@Validated @RequestBody StoreConfigParam param){
        StoreConfig storeConfig = modelMapper.map( param, StoreConfig.class );
        storeConfig.setStoreType(StoreTypeEnum.S3); //这里故意多写一行演示设置枚举值,实际上 modelMapper可以自动帮我买转换
        storeConfigMapper.updateById( storeConfig);
        return ResultRes.success();
    }
}

使用modelMapper讲param转换成endity时,会自动转换枚举值,非常方便。查询时,也会将数据库中存的code(整型)自动转为枚举名(字符串类型)。避免接收或返回给前端无意义的状态数字

三,JSON使用

(1)关于Mysql的JSON字段说明

https://www.cnblogs.com/ivictor/p/16221712.html

(2)使用modelMapper自动转换模型

modelMapper不能默认自动转换json高级类型,但它提供自定义converter方法。我们只需要在配置中加入以下代码。这里有2种方法,JSONObject转string,存数据库也行,JSONObject转JSONObject也行(听起来很奇怪,但即使完全相同的类型,modelMapper确实不能识别),Mybatis可以完成JSONObject的存储

@Configuration
public class ModelMapperConfig {

    @Bean
    public ModelMapper modelMapper(){

        ModelMapper modelMapper = new ModelMapper();
        modelMapper.getConfiguration().setFullTypeMatchingRequired(true);
        modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
         
        //从JSONObject类型转为 entidy 的JSONObject类型
        Converter< JSONObject, JSONObject > converter = new Converter<JSONObject, JSONObject>() {
            @Override
            public JSONObject convert(MappingContext<JSONObject, JSONObject> mappingContext) {
                return mappingContext.getSource();
            }
        };

        modelMapper.addConverter(converter);
        return modelMapper;

    }

}

(3)mybatis的自适应

entidy模型,必须开启注释@TableName(autoResultMap = true),并在json字段注释@tableField,指定使用typeHandler为json。mybatis默认提供了fastjson\fastjson2\jackson等的typehandler,也可以利用typehandler机制自己实现

@TableName(autoResultMap = true) //必须加这个选项
@Data
public class StoreConfig {

    @TableId
    private String storeKey;
    private String des;
    private StoreTypeEnum storeType;

    @TableField( typeHandler = Fastjson2TypeHandler.class) //mybatis 自带的TypeHandler可以处理json
    private JSONObject storeMeta;

}

完毕后,即可以使用JSONObject插入、更新字段,查询出内容后也会自动转换成JSON格式输出

{
    "code": 0,
    "message": null,
    "data": {
        "records": [
            {
                "storeKey": "tes2",
                "des": "这是一个测试配置da",
                "storeType": "S3",
                "storeMeta": {
                    "key": "wawa444"
                }
            },
   }
}