跳转至

线程池动态切换方案

需求背景

Per_thread采用每个用户连接对应一个handle_connection线程的方式处理用户请求。Thread_pool采用一定的工作线程池(线程池大小通常与CPU核数相同)来处理用户连接请求。Thread_pool模式与Per_thread模式相比,更适合处理大量用户的短查询请求,可以避免频繁创建和销毁线程,避免并发数较高的情况下的临界资源争抢。Thread_pool的不足在于当用户请求偏向于慢查询时,工作线程阻塞在高时延操作上,难以快速响应新的用户请求,导致系统吞吐量反而相较于Per_thread模式更低。

Per_thread模式与Thread_pool模式各有优缺点,系统需要根据用户的业务类型灵活地进行切换。遗憾的是,当前两种模式的切换必须重启服务器才能完成。通常而言,两种模式相互转换的需求都是出现在业务高峰时段,此时强制重启服务器将对用户业务造成严重影响。为了提高Per_thread模式与Thread_pool模式切换的灵活程度,提出了线程池动态切换的需求。

Per_thread模式分析

Per_thread模式为避免频繁创建和销毁pthread,对handle_connection有一个重用逻辑,下面以伪代码的方式分析Per_thread对用户连接的处理逻辑:

hanlde_connection 在退出前调用了my_thread_end函数
    my_thread_init
    for(;;) {
        thd = init_new_thd(channel_info)
        if (pthread_reused) //这里的复用是指一个线程退出了,也需要调用end_connection,close_connection等函数清理资源,最后阻塞在block_until_new_connection等待新的连接进来以后复用该pthread,挺有借鉴意义的
            PSI_THREAD_CALL(new_thread)(key_thread_one_connection, thd, thd->thread_id()); 
            PSI_THREAD_CALL(set_thread_os_id)(psi); 
            PSI_THREAD_CALL(set_thread)(psi);
        mysql_thread_set_psi_id(thd->thread_id());
        mysql_thread_set_psi_THD(thd);
        mysql_socket_set_thread_owner(thd->get_protocol_classic()->get_vio()->mysql_socket);

        thd_manager->add_thd(thd);//在thd_manager中注册新的连接
        thd_prepare_connection(thd)//新请求的连接与认证
        while (thd_connection_alive(thd))//不断处理用户请求直至连接退出
            do_command
        end_connection//连接成功后退出
        close_connection//关闭连接
        thd->get_stmt_da()->reset_diagnostics_area();
        thd->release_resources(); //清理thd资源
        thd_manager->remove_thd(thd);//在thd_manager中移除连接

        Connection_handler_manager::dec_connection_count(extra_port_connection);//全局线程连接数减一
        thd->set_psi(NULL);//清空thd psi信息
        PSI_THREAD_CALL(delete_current_thread)();
        delete thd;//删除thd
        Per_thread_connection_handler::block_until_new_connection(); //避免关闭当前handle_connection线程,阻塞至有新连接请求,重新当前线程
        pthread_reused = true;
    }

    my_thread_end();
        PSI_THREAD_CALL(delete_current_thread);
        --THR_thread_count
        set_mysys_thread_var(NULL);
    my_thread_exit(0);//handle_connection线程退出
        pthread_exit

Thread_pool模式分析

Thread_pool模式中,每个CPU对应一个线程组,线程组通常只有一个或多个活跃的worker线程和一个listener线程,每个线程组创建的线程上限为threadpool_oversubscribe。

全局的timer线程定期检查各个线程组,如果发现线程组中event queue很久未被消费,则唤醒或创建新的工作线程。

listener线程与worker线程可以灵活切换,当未到oversubscribed状态,如果线程组没有listener线程,则worker thread可以转化为listener线程。达到oversubscribed状态后,listener线程可能充当worker thread处理用户请求。

当新用户请求连接时,会按照一定的方式将其分配到对应的线程组中,该连接后续的所有请求都交于该工作组处理,直至该用户连接退出。

所有的用户请求(包括新连接的实际建立)都对应于一个Thread_pool event。listener线程用于监听用户请求,出现新event时将event按优先级存放到high_prio_queue或queue中,high_prio_queue中的event将在worker线程中优先被处理。worker线程的职责是不断重复get_event和handle_event的操作。下面来看handle_event的伪代码:

handle_event
    if (!connection->logged_in)
        err = threadpool_add_connection(connection->thd); //调用thd_prepare_connection进行新请求的连接与认证
        connection->logged_in = true;
    else
        err = threadpool_process_request(connection->thd); //调用do_command处理用户请求;详细堆栈如下

    err = start_io(connection); //1. 如果group_size变化,则调整connection到新的group;2. 重新bind to poll
    if (err) connection_abort(connection); //退出连接,详细堆栈如下


 threadpool_process_request
    thread_attach(thd);
    thd_set_net_read_write(thd, 0);
    do_command(thd);
    MYSQL_SOCKET_SET_STATE(thd->get_protocol_classic()->get_vio() ->mysql_socket, PSI_SOCKET_STATE_IDLE);
    MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
    thd->m_server_idle= true; 

connection_abort
    threadpool_remove_connection(connection->thd);
        thread_attach
        thd_set_net_read_write(thd, 0);
        end_connection(thd);       
        close_connection(thd, 0);  
        thd->release_resources();
        PSI_THREAD_CALL(delete_thread)(thd->get_psi());
        Global_THD_manager::get_instance()->remove_thd(thd);
        Connection_handler_manager::dec_connection_count();
        delete thd;
    group->connection_count--;

Per_thread to Thread_pool

Per_thread to Thread_pool分为两部分。第一部分是新连接自动切换为Thread_pool方式,这种方式只需要在内核中支持Per_thread、Thread_pool两种模式共存,并且采用Connection_handler_manager::thread_handling来控制新连接的处理方式即可,较为简单,这里不再详细讨论。

下面重点讨论如何将尚未关闭的Per_thread连接平滑迁移到Thread_pool中。

正常Per_thread模式下,handle_connection线程在thd_connection_alive(thd)时会不断循环do_command。

Per_thread to Thread_pool的关键在于打破上述循环,将已经完成连接的thd平滑迁移至Thread_pool某线程组的event queue中,并顺势关闭thd原先所在的handle_connection线程。

tp_migrate 类似于 Thread_pool_connection_handler::add_connection

Thread_pool_connection_handler::add_connection
{
    //1. 获取channel,通过channel创建thd
    THD *thd = channel_info->create_thd();
    //2. 创建connection,event队列中的基本元素
    connection_t *connection = alloc_connection(thd);
    //3. 设置新thread_id和线程开始时间
    thd->set_new_thread_id();
    thd->start_utime= thd->thr_create_utime= my_micro_time();
    //4. 设置线程的调度方法
    thd->scheduler= &tp_event_functions;
    //5. 将thd添加至全局的THD_manager中
    Global_THD_manager::get_instance()->add_thd(thd);
    //6. 线程池相关设置;connection放入相应的thread group中
    thd->event_scheduler.data= connection;
    thread_group_t *group= &all_groups[thd->thread_id() % group_count];
    connection->thread_group= group;
    group->connection_count++; 
    queue_put(group, connection);
}

由于THD在one_connection_one_thread阶段已经完成了THD_manager的注册,设置了thread_id和start_utime,完成了注册连接,因此上述步骤中1,3,5都不再需要。

handle_event的主要逻辑已经在第3节展示过,这里为了完成thd from Per_thread to Thread_pool的平滑迁移,还需要对thd做一定的装备:

handle_event
    if (!connection->logged_in)
        err = threadpool_add_connection(connection->thd); //调用thd_prepare_connection进行新请求的连接与认证
        connection->logged_in = true;
    else
        if (connection->from_per_thread) //如果是来自于Per_thread的thd,还需要进行thd thread_pool相关的状态设置
            connection->from_per_thread = false;
            threadpool_process_request_prepare(connection->thd)
        err = threadpool_process_request(connection->thd); //调用do_command处理用户请求;

    err = start_io(connection); //1. 如果group_size变化,则调整connection到新的group;2. 重新bind to poll
    if (err) connection_abort(connection); //退出连接,详细堆栈如下

迁移失败则继续使用Per_thread模式处理该连接,并且打印一条错误日志。

Thread_pool to Per_thread

Thread_pool to Per_thread同样分为两部分。第一部分是将新连接全部交付给Per_thread模式处理,这种方式同样是只需要通过Connection_handler_manager::thread_handling控制新连接的处理模式即可,我们也不再详细讨论。

下面讨论,如何将运行中Thread_pool connection平滑迁移至Per_thread中。

Thread_pool模式下的worker线程每次执行完threadpool_add_connection或threadpool_process_request会调用start_io,将connection重新bind to pool。这是一个合适的切换时间点。在此刻需要伪造channel_info,并新增channel_info->from_thread_pool状态。利用上述伪造的channel_info创建handle_connection线程。

handle_event
    if (!connection->logged_in)
        err = threadpool_add_connection(connection->thd); //调用thd_prepare_connection进行新请求的连接与认证
        connection->logged_in = true;
    else
        if (connection->from_per_thread) //如果是来自于Per_thread的thd,还需要进行thd thread_pool相关的状态设置
            connection->from_per_thread = false;
            threadpool_process_request_prepare(connection->thd)
        err = threadpool_process_request(connection->thd); //调用do_command处理用户请求;

    ulong tmp_thread_handling = Connection_handler_manager::thread_handling;
    if (Connection_handler_manager::thread_handling == SCHEDULER_ONE_THREAD_PER_CONNECTION)
        Connection_handler_manager *handler_manager = Connection_handler_manager::get_instance();
        handler_manager->get_connection_handler(tmp_thread_handling)->migrate(thd);//在migrate中伪造的channel_info,设置from_thread_pool为true,创建handle_connection线程
        clean_connection_from_thread_pool(connection);//从Thread_pool中清理该连接的信息,可借鉴connection_abort(),但不需要关闭连接和清理thd
        return;

    err = start_io(connection); //1. 如果group_size变化,则调整connection到新的group;2. 重新bind to poll
    if (err) connection_abort(connection); //退出连接,详细堆栈如下
    return;

在handle_connection线程中,通过channel_info->from_thread_pool状态跳过init_new_thd,thd_manager->add_thd(thd),以及实际连接和认证过程。在进行必要的状态设置后,直接进入do_command环节。

hanlde_connection 在退出前调用了my_thread_end函数
    my_thread_init
    for(;;) {
        if (!channel_info->from_thread_pool)
            thd = init_new_thd(channel_info) //利用from_thread_pool状态跳过
        else
            handle_connection_prepare//相应的状态准备
        if (pthread_reused || channel_info->from_thread_pool) //from_thread_pool状态下运行
            PSI_THREAD_CALL(new_thread)(key_thread_one_connection, thd, thd->thread_id()); 
            PSI_THREAD_CALL(set_thread_os_id)(psi); 
            PSI_THREAD_CALL(set_thread)(psi);
        mysql_thread_set_psi_id(thd->thread_id());
        mysql_thread_set_psi_THD(thd);
        mysql_socket_set_thread_owner(thd->get_protocol_classic()->get_vio()->mysql_socket);

        if (!channel_info->from_thread_pool)
            thd_manager->add_thd(thd);//在thd_manager中注册新的连接
            thd_prepare_connection(thd)//新请求的连接与认证

        while (thd_connection_alive(thd))//不断处理用户请求直至连接退出
            do_command
        end_connection//连接成功后退出
        close_connection//关闭连接
        thd->get_stmt_da()->reset_diagnostics_area();
        thd->release_resources(); //清理thd资源
        thd_manager->remove_thd(thd);//在thd_manager中移除连接

        Connection_handler_manager::dec_connection_count(extra_port_connection);//全局线程连接数减一
        thd->set_psi(NULL);//清空thd psi信息
        PSI_THREAD_CALL(delete_current_thread)();
        delete thd;//删除thd
        Per_thread_connection_handler::block_until_new_connection(); //避免关闭当前handle_connection线程,阻塞至有新连接请求,重新当前线程
        pthread_reused = true;
    }

    my_thread_end();
        PSI_THREAD_CALL(delete_current_thread);
        --THR_thread_count
        set_mysys_thread_var(NULL);
    my_thread_exit(0);//handle_connection线程退出
        pthread_exit

迁移失败则继续使用Thread_pool模式处理该连接,并且打印一条错误日志。