本文共 3500 字,大约阅读时间需要 11 分钟。
dpdk中采用多核编程代替多线程模型,详细分析如下:在环境初始化函数rte_eal_init的最后有下面这段code#遍历系统中所有的cpuRTE_LCORE_FOREACH_SLAVE(i) { /* * create communication pipes between master thread * and children */ #创建两个pipe,用于通知开始执行线程 if (pipe(lcore_config[i].pipe_master2slave) < 0) rte_panic("Cannot create pipe\n"); if (pipe(lcore_config[i].pipe_slave2master) < 0) rte_panic("Cannot create pipe\n"); lcore_config[i].state = WAIT; /* create a thread for each lcore */ #为每个核创建一个线程,线程的回调函数是eal_thread_loop ret = pthread_create(&lcore_config[i].thread_id, NULL, eal_thread_loop, NULL); if (ret != 0) rte_panic("Cannot create thread\n"); #修改线程的name /* Set thread_name for aid in debugging. */ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "lcore-slave-%d", i); ret = rte_thread_setname(lcore_config[i].thread_id, thread_name); if (ret != 0) RTE_LOG(DEBUG, EAL, "Cannot set name for lcore thread\n"); }这里我们来具体看看回调函数eal_thread_loop/* main loop of threads */__attribute__((noreturn)) void *eal_thread_loop(__attribute__((unused)) void *arg){ char c; int n, ret; unsigned lcore_id; pthread_t thread_id; int m2s, s2m; char cpuset[RTE_CPU_AFFINITY_STR_LEN]; #得到当前的线程id thread_id = pthread_self(); #将这个线程绑定在这个cpu上 /* set CPU affinity */ if (eal_thread_set_affinity() < 0) rte_panic("cannot set affinity\n"); ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN); RTE_LOG(DEBUG, EAL, "lcore %u is ready (tid=%p;cpuset=[%s%s])\n", lcore_id, thread_id, cpuset, ret == 0 ? "" : "..."); /* read on our pipe to get commands */ while (1) { void *fct_arg; #开始等待通过pipe发过来执行的命令 /* wait command */ do { n = read(m2s, &c, 1); } while (n < 0 && errno == EINTR); if (n <= 0) rte_panic("cannot read on configuration pipe\n"); lcore_config[lcore_id].state = RUNNING; /* send ack */ #回复即将开始执行线程 n = 0; while (n == 0 || (n < 0 && errno == EINTR)) n = write(s2m, &c, 1); if (n < 0) rte_panic("cannot write on configuration pipe\n"); /* call the function and store the return value */ fct_arg = lcore_config[lcore_id].arg; #开始执行在这个核上的函数,并传递参数 ret = lcore_config[lcore_id].f(fct_arg); lcore_config[lcore_id].ret = ret; rte_wmb(); #设置flag 表示已经执行完成,开始下一次等待 lcore_config[lcore_id].state = FINISHED; }}从这里知道所谓的多核编程就是每个核上有一个thread,平时这个thread处于sleep状态,要是有人通过pipe发送信号则开始执行这个核上的函数。那下来我们看看如何给这个线程发送命令开始执行呢?答案是通过下面这个函数进行 rte_eal_mp_remote_launch(sync_func, NULL, SKIP_MASTER); intrte_eal_mp_remote_launch(int (*f)(void *), void *arg, enum rte_rmt_call_master_t call_master){ int lcore_id; int master = rte_get_master_lcore(); /* check state of lcores */ #如果当前核上的线程不是wait状态说明正在执行其他任务则退出 RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (lcore_config[lcore_id].state != WAIT) return -EBUSY; } /* send messages to cores */ #发送信号给所有的核 RTE_LCORE_FOREACH_SLAVE(lcore_id) { rte_eal_remote_launch(f, arg, lcore_id); } return 0;}intrte_eal_remote_launch(int (*f)(void *), void *arg, unsigned slave_id){ int n; char c = 0; int m2s = lcore_config[slave_id].pipe_master2slave[1]; int s2m = lcore_config[slave_id].pipe_slave2master[0]; if (lcore_config[slave_id].state != WAIT) return -EBUSY; #保存在这个核上要执行的函数f和其参数arg lcore_config[slave_id].f = f; lcore_config[slave_id].arg = arg; /* send message */ #开始发送命令开始执行任务 n = 0; while (n == 0 || (n < 0 && errno == EINTR)) n = write(m2s, &c, 1); if (n < 0) rte_panic("cannot write on configuration pipe\n"); /* wait ack */ #等待回复 do { n = read(s2m, &c, 1); } while (n < 0 && errno == EINTR); if (n <= 0) rte_panic("cannot read on configuration pipe\n"); return 0;}
转载地址:http://onnmi.baihongyu.com/