foreword

Hello everyone! happy New Year!It’s a pleasure to meet you again!Happy New Years Eve everyone good luck! May all your wishes come true! Peace and safety! May all go well with you!

Gobrs-Async has brought you many useful, efficient and practical functions. In the new year, use a brand-new development thinking to welcome a brand-new bright future!

Dear technical friends, fasten your seat belts and get ready to start. Let’s Go! ! !

Overtime task

As the name implies, a single task can determine whether the task can continue to execute by setting the timeout period. It is very useful for developers to develop IO and CPU computing tasks.

How to use

use @Taskin the comment timeoutInMilliseconds properties to configure.

  • timeoutInMilliseconds fixed number of milliseconds used

package com.gobrs.async.test.task.timeout;

import com.gobrs.async.core.TaskSupport;
import com.gobrs.async.core.anno.Task;
import com.gobrs.async.core.task.AsyncTask;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Task(timeoutInMilliseconds = 300)
public class CaseTimeoutTaskA extends AsyncTask {
    int i = 10000;

    @Override
    public void prepare(Object o) {
        log.info(this.getName() + " 使用线程---" + Thread.currentThread().getName());
    }

    @SneakyThrows
    @Override
    public String task(Object o, TaskSupport support) {
        System.out.println("CaseTimeoutTaskA Begin");
        Thread.sleep(400);
        for (int i1 = 0; i1 < i; i1++) {
            i1 += i1;
        }
        System.out.println("CaseTimeoutTaskA Finish");
        return "result";
    }
}

Timeout listener thread pool configuration

The so-called monitoring thread pool configuration is: the number of core threads in the thread pool where the monitoring task times out, and why this parameter should be configured. Please review the special instructions below.


gobrs:
  async:
    config:
      rules:
        - name: "chain1"
          content: "taskA->taskB->taskC"
      timeout-core-size: 200 # 核心线程数量

Special Note

  • Timeout tasks do not support thread multiplexing, because logical judgment needs to be made by controlling thread timeout. If thread multiplexing is supported, task execution of multiplexing threads may be interrupted.
  • Fuse derating principle referenceHystrix way, if theHystrixIf you are not familiar with it, you can learn from the analysis of Hystrix fuse downgrade principle
  • Due to thread scheduling, the timeout time may have an error within 10ms, which can be ignored!

thread reuse

small question

Imagine that in the following task process, under ideal circumstances, at least a few threads can be used to complete the execution of the process? taskA->taskB,TaskC->taskD->taskE,taskF Is it 6 threads or 5, 4, 3, 2 You may have thought that the answer is that 2 threads (except the main thread) can complete this complex multi-threaded process. why?Because there are only two concurrent tasks in TaskB, TaskC, or TaskE and TaskF during concurrent execution, the fundamental condition for determining the number of threads to use is how manyConcurrent tasks are executed at the same time Then look at how many threads are executing gobrs-async at this time

ccc

test case

address

operation result


主线程使用main
使用main
TaskA
2022-12-11 13:58:22.581  INFO 8039 --- [           main] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskA] execution
使用main
TaskC
使用pool-2-thread-1
TaskB
2022-12-11 13:58:22.694  INFO 8039 --- [           main] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [TaskC] execution
2022-12-11 13:58:22.694  INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskB] execution
使用pool-2-thread-1
TaskD
2022-12-11 13:58:22.804  INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskD] execution
使用pool-2-thread-1
使用pool-2-thread-2
TaskE
2022-12-11 13:58:22.909  INFO 8039 --- [pool-2-thread-2] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskE] execution
TaskF
2022-12-11 13:58:22.910  INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskF] execution
2022-12-11 13:58:22.913  INFO 8039 --- [           main] com.gobrs.async.core.TaskLoader          : 【ProcessTrace】Total cost: 440ms | traceId = 11780902254572224 | 【task】taskA cost :103ms【state】:success; ->【task】taskB cost :103ms【state】:success; ->【task】TaskC cost :103ms【state】:success; ->【task】taskD cost :106ms【state】:success; ->【task】taskE cost :101ms【state】:success; ->【task】taskF cost :101ms【state】:success; 
耗时462

in conclusion

You can see through the log TaskCused TaskAThe thread executes the task, becauseTaskB withTaskC It is parallel, so it is necessary to open up a new thread to execute at this timeTaskB,wait untilTaskBAfter execution is complete, TaskDcontinue to use TaskBThe thread pool-2-thread-1 executes the task, at this timeTaskCAfter the execution is completed, it is found that its subtask has been executed by the thread released by TaskB, so there is no need to use its own thread to execute the task. Similarly, the task flow continues to execute. A total of 3 threads (including the main thread) are used in the whole process.

thread pool isolation

introduce

Gobrs-Async Provides a thread pool configuration isolation mechanism. Different rules can use different thread pools to process tasks, preventing the thread pool performance bottleneck of a certain task rule from affecting the operation of other rule processes. If the rule does not do thread pool configuration. Then the unified thread pool configuration will be used by default. If there is no unified thread pool configuration.butSDKwill use by default Executors.newCachedThreadPool() as the default thread pool.

Custom fixed thread pool (API method)

Gobrs-Async The default is to use Executors.newCachedThreadPool() The thread pool, if you want to customize the thread pool. Meet your own thread pool needs.just inheritGobrsThreadPoolConfiguration rewritedoInitializemethod, configured as follows:


@Configuration
public class ThreadPoolConfig extends GobrsThreadPoolConfiguration {

    @Override
    protected void doInitialize(GobrsAsyncThreadPoolFactory factory) {
        /**
         * 自定义线程池
         */
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(300, 500, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue());

        //  ExecutorService executorService = Executors.newCachedThreadPool();
        factory.setDefaultThreadPoolExecutor(threadPoolExecutor);
        //  factory.setThreadPoolExecutor("ruleName",threadPoolExecutor); // 设置规则隔离的线程池 ruleName 为 yml中配置的规则名称
    }
}

Configuration method YML

Default thread pool configuration


gobrs:
  async:
    config:
      ## 如果规则没有制定 线程池 则使用 统一的线程池配置 如果通过 API 的方式动态更新了线程池 则使用动态更新 替换配置文件线程池配置 参见: ThreadPoolConfig
      thread-pool:
        core-pool-size: 1000
        max-pool-size: 2000

Custom rule configuration

If the developer has made a separate thread pool configuration for process rules. Then the custom configuration of the rule will be used first. That is as follows:caseOneThe process will use the thread pool configured ascorePoolSize: 10 maxPoolSize: 20


gobrs:
  async:
    config:
      ## 如果规则没有制定 线程池 则使用 统一的线程池配置 如果通过 API 的方式动态更新了线程池 则使用动态更新 替换配置文件线程池配置 参见: ThreadPoolConfig
      thread-pool:
        core-pool-size: 1000
        max-pool-size: 2000
      rules:
        - name: "caseOne"
          content: "caseOneTaskA->caseOneTaskB,caseOneTaskC,caseOneTaskD"
          threadPool:
            corePoolSize: 10
            maxPoolSize: 20
        # 官方场景二 https://async.sizegang.cn/pages/2f844b/#%E5%9C%BA%E6%99%AF%E4%BA%8C
        - name: "caseTwo"
          content: "caseTwoTaskA->caseTwoTaskB->caseTwoTaskC,caseTwoTaskD"
          threadPool:
            corePoolSize: 30
            maxPoolSize: 40

Hot update thread pool configuration

Developers may have this kind of distress. The thread pool starts from the thread pool when the project is initialized at runtimeapplication.ymlLoaded in , once the program is running, the thread pool used cannot be modified.If your company has a distributed configuration center that can update the application of the program memory in real time, thengobrsAlso provides you with access.

Our company has its own hot update components, all of which can be used as follows:

Configuration center configuration


{
corePoolSize: 210,
maxPoolSize: 600,
keepAliveTime: 30,
capacity: 10000,
threadNamePrefix: "m-detail"
rejectedExecutionHandler: "CallerRunsPolicy"
}

@Slf4j
@Configuration
public class ThreadPoolConfig {

    @Autowired
    private GobrsAsyncThreadPoolFactory factory;

    @Resource
    private DUCCConfigService duccConfigService;

    @PostConstruct
    public void gobrsThreadPoolExecutor() {
        // 从配置中心拿到 线程池配置规则 DuccConstant.GOBRS_ASYNC_THREAD_POOL 为线程池配置在配置中心的key
        String config = duccConfigService.getString(DuccConstant.GOBRS_ASYNC_THREAD_POOL);
        ThreadPool threadPool = JSONObject.parseObject(config, ThreadPool.class);
         
        // 通过gobrs-async 提供的构造器进行构造线程池
        ThreadPoolExecutor executor = ThreadPoolBuilder.buildByThreadPool(threadPool);
        factory.setDefaultThreadPoolExecutor(executor); // 设置默认线程池
        //     factory.setThreadPoolExecutor("ruleName",threadPoolExecutor);  // 设置规则隔离线程池
        
        listenerDucc();
    }
    
    // 监听配置中心 线程池改动
    private void listenerDucc() {
        duccConfigService.addListener(new DuccListener(DuccConstant.GOBRS_ASYNC_THREAD_POOL, property -> {
            log.warn("监听到DUCC配置GobrsAsync 线程池配置变更,property:{}", JSON.toJSONString(property.getValue()));
            ThreadPool threadPool = JSONObject.parseObject(property.getValue().toString(), ThreadPool.class);
            ThreadPoolExecutor executor = ThreadPoolBuilder.buildByThreadPool(threadPool);
            factory.setThreadPoolExecutor(executor);
            // 线程池更新成功
            log.warn("GobrsAsync thread pool update success");
        }));
    }

}


configuration priority

Real-time update configuration > API configuration > (yml, yaml, properties) configuration

Plug-in mechanism

Gobrs-Async use SPIThe mechanism builds the plug-in system. Users only need to introduce the plug-in dependencies provided by the SDK to complete the plug-in intervention. Currently supported plug-ins include the following two.Will continue to update in the future! ! !.

Monitoring series

skywalking plugin

skywalking is a full-link monitoring platform, because skywalking is not compatible with multi-thread traceId, soGobrs-Async supplyskywalking plugin

pom.xml


<dependency>
    <groupId>io.github.memorydoc</groupId>
    <artifactId>gobrs-async-skywalking-plugin</artifactId>
    <version>1.2.9-RELEASE</version>
</dependency>

Only need to introduce dependencies to complete the perfect adaptation with skywalking. Isn’t it amazing!

log series

Full link traceId plugin

All development students already know that the full link traceId is printed in the log for convenient link tracking序列号. With it, you can easily track online problems, simple and easy to use.

pom.xml


<dependency>
    <groupId>io.github.memorydoc</groupId>
    <artifactId>gobrs-async-trace-plugin</artifactId>
    <version>1.2.9-RELEASE</version>
</dependency>

static injection

need to be inSpringBootWrite in the startup class


static {
    GobrsLogger.logger();
}

Summarize

Gobrs-Async provides high performanceMulti-thread managementwithMulti-threaded concurrent orchestrationfunctions, and will continue to launch multi-threaded task visual management (thread pool monitoring, log analysis, dynamic tasks, etc.) in the future. If you want to swim in the world of multi-threaded programming, please pay attention as soon as possible.Gobrs-Async And give the project a small ❤️ remember to star~

Your star is my biggest open source motivation! ! !

Links

Moreabout quick access ,Performance stress test Please visit the official website to view the report.

Official website address

Gitee

GitHub

#GobrsAsync #129RELEASE #High #Performance #Concurrent #Programming #Framework #News Fast Delivery

Leave a Comment

Your email address will not be published. Required fields are marked *