CANN/shmem 初始化与通信示例
初始化【免费下载链接】shmemCANN SHMEM 是面向昇腾平台的多机多卡内存通信库基于OpenSHMEM 标准协议实现跨设备的高效内存访问与数据同步。项目地址: https://gitcode.com/cann/shmem该样例工程位于examples/init文件夹下。SHMEM当前提供了三种flag用于初始化 - ACLSHMEMX_INIT_WITH_DEFAULT 默认初始化流程可以不依赖第三方库完成初始化但需要保证设定的ipport空闲 - ACLSHMEMX_INIT_WITH_MPI 依赖MPI的多进程管理能力的初始化流程 - ACLSHMEMX_INIT_WITH_UNIQUEID 运行时依赖第三方库广播能力MPI_Bcast/torch.distributed.broadcast等的初始化流程在该样例中分别实现了SHMEM提供的所有初始化流程调用方式。可以通过脚本参数编译运行不同flag对应的初始化流程。默认使用回环地址127.0.0.1端口8666。该用例仅用以单机示范请保证端口空闲可绑定。ACLSHMEMX_INIT_WITH_DEFAULT样例运行在根目录调用scripts/build.sh控制后端。bash scripts/build.shcd examples/init # 编译并运行两个pe的default初始化流程 bash run.sh -mode default -pesize 2代码实现SHMEM初始化接口调用前需要完成attributes创建此用例提供了一种attributes构建方式也可根据实际需要参考aclshmemx_init_attr_t定义自行设置。// attributes设置函数 int32_t test_set_attr(int32_t my_pe, int32_t n_pes, uint64_t local_mem_size, const char *ip_port, aclshmemx_uniqueid_t *default_flag_uid, aclshmemx_init_attr_t *attributes) { size_t ip_len 0; if (ip_port ! nullptr) { ip_len std::min(strlen(ip_port), (size_t)(ACLSHMEM_MAX_IP_PORT_LEN - 1)); std::copy_n(ip_port, ip_len, attributes-ip_port); if (attributes-ip_port[0] \0) { return ACLSHMEM_INVALID_VALUE; } } int attr_version (1 16) sizeof(aclshmemx_init_attr_t); attributes-my_pe my_pe; attributes-n_pes n_pes; attributes-ip_port[ip_len] \0; attributes-local_mem_size local_mem_size; attributes-option_attr {attr_version, ACLSHMEM_DATA_OP_MTE, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT}; attributes-comm_args reinterpret_castvoid *(default_flag_uid); aclshmemx_uniqueid_t *uid_args (aclshmemx_uniqueid_t *)(attributes-comm_args); return ACLSHMEM_SUCCESS; }//default初始化流程 int run_main(int argc, char* argv[]) { int pe atoi(argv[1]); int pe_size atoi(argv[2]); std::string ipport argv[3]; aclshmemx_uniqueid_t default_flag_uid ACLSHMEM_UNIQUEID_INITIALIZER; aclshmemx_init_attr_t attributes; int status ACLSHMEM_SUCCESS; // acl初始化 aclInit(nullptr); aclrtSetDevice(pe); uint64_t local_mem_size 1024 * 1024 * 1024; // 创建attributes test_set_attr(pe, pe_size, local_mem_size, ipport.c_str(), default_flag_uid, attributes); // 初始化 status aclshmemx_init_attr(ACLSHMEMX_INIT_WITH_DEFAULT, attributes); /* 你的任务 */ //去初始化 status shmem_finalize(); aclrtResetDevice(pe); aclFinalize(); return 0; }ACLSHMEMX_INIT_WITH_MPI样例运行编译前需要安装并配置MPI环境变量不配置无法编译MPI能力so。# mpich安装在默认路径时可参考需根据实际情况替换。 export PATH/usr/local/mpich/bin:$PATH export LD_LIBRARY_PATH/usr/local/mpich/lib:$LD_LIBRARY_PATH在根目录调用scripts/build.shbash scripts/build.shcd examples/init # 编译并运行两个pe的mpi初始化流程 bash run.sh -mode mpi -pesize 2代码实现//mpi初始化流程 int run_main() { // MPI初始化 MPI_Init(nullptr, nullptr); int pe; int pe_size; MPI_Comm_rank(MPI_COMM_WORLD, pe); MPI_Comm_size(MPI_COMM_WORLD, pe_size); int status ACLSHMEM_SUCCESS; // acl初始化 aclInit(nullptr); aclrtSetDevice(pe); uint64_t local_mem_size 1024 * 1024 * 1024; // 创建attributes依赖MPI无需使用ipport可以不传。 aclshmemx_init_attr_t attributes { pe, pe_size, , local_mem_size, {0, ACLSHMEM_DATA_OP_MTE, 120, 120, 120}}; // 初始化 status aclshmemx_init_attr(ACLSHMEMX_INIT_WITH_MPI, attributes); /* 你的任务 */ //去初始化 status shmem_finalize(); aclrtResetDevice(pe); aclFinalize(); MPI_Finalize(); return 0; }ACLSHMEMX_INIT_WITH_UNIQUEID样例运行编译前需要安装并配置MPI环境变量不配置无法编译MPI能力so。# mpich安装在默认路径时可参考需根据实际情况替换。 export PATH/usr/local/mpich/bin:$PATH export LD_LIBRARY_PATH/usr/local/mpich/lib:$LD_LIBRARY_PATH在根目录调用scripts/build.sh控制后端。bash scripts/build.shcd examples/init # 编译并运行两个pe的mpi初始化流程 bash run.sh -mode mpi -pesize 2代码实现//uid初始化流程 int run_main() { // MPI初始化 MPI_Init(nullptr, nullptr); int pe; int pe_size; MPI_Comm_rank(MPI_COMM_WORLD, pe); MPI_Comm_size(MPI_COMM_WORLD, pe_size); int status ACLSHMEM_SUCCESS; // acl初始化 aclInit(nullptr); aclrtSetDevice(pe); aclshmemx_init_attr_t attributes; aclshmemx_uniqueid_t uid ACLSHMEM_UNIQUEID_INITIALIZER; int64_t local_mem_size 1024 * 1024 * 1024; // pe 0 获取uid if (pe 0) { status aclshmemx_get_uniqueid(uid); } // 广播pe 0 uid到其他pe MPI_Bcast(uid, sizeof(aclshmemx_uniqueid_t), MPI_UINT8_T, 0, MPI_COMM_WORLD); // 调用uid的set接口创建attributes。 status aclshmemx_set_attr_uniqueid_args(pe, pe_size, local_mem_size, uid, attributes); // 初始化 status aclshmemx_init_attr(ACLSHMEMX_INIT_WITH_UNIQUEID, attributes); /* 你的任务 */ //去初始化 status shmem_finalize(); aclrtResetDevice(pe); aclFinalize(); MPI_Finalize(); return 0; }AllGather该样例工程位于examples/allgather文件夹下。在该样例中实现了一个通信量较小(单PE通信量小于2MB)情况下有着更低时延的AllGather纯通信算子。 各PE首先将存在本端input地址下的数据push到本PE的对称内存上确认远端PE的任务完成后从远端PE的对称内存拉取对应PE的数据从而整体完成AllGather的操作。这个样例展示了多种shmem API的用法包括aclshmemx_mte_put_nbi、aclshmemx_signal_op以及aclshmemx_mte_get_nbi等用于p2p的通信以及同步任务。核函数实现#include kernel_operator.h #include acl/acl.h #include shmem.h using namespace AscendC; constexpr int64_t SYNC_FLAG_INTERVAL 16; constexpr int64_t UB_DMA_MAX_SIZE 190 * 1024; constexpr int64_t GVA_BUFF_MAX_SIZE 100 * 1024 * 1024; templatetypename T ACLSHMEM_DEVICE void all_gather_small_data(uint64_t fftsAddr, __gm__ T* input, __gm__ T* output, __gm__ T* gva, int elements, int magic) { const int64_t aivNum GetBlockNum() * 2; const int64_t aivIndex GetBlockIdx(); const int64_t data_offset aivNum * SYNC_FLAG_INTERVAL; const int64_t flag_offset aivIndex * SYNC_FLAG_INTERVAL; int64_t my_rank aclshmem_my_pe(); int64_t pe_size aclshmem_n_pes(); __gm__ T *input_gm (__gm__ T *)input; __gm__ T *output_gm (__gm__ T *)output; __gm__ T *gva_data_gm (__gm__ T*)((__gm__ int32_t*)gva data_offset); __gm__ int32_t *gva_sync_gm (__gm__ int32_t *)gva; __ubuf__ T* tmp_buff (__ubuf__ T*)(64); // data move parameters const uint32_t ub_size UB_DMA_MAX_SIZE; uint32_t input_offset, output_offset, gva_offset, num_per_core; // [AllGather Step 1] local input gm - symmetric mem. num_per_core elements / aivNum; input_offset aivIndex * num_per_core; gva_offset aivIndex * num_per_core; if (aivIndex aivNum - 1) { num_per_core elements - num_per_core * aivIndex; } aclshmemx_mte_put_nbi(gva_data_gm gva_offset, input_gm input_offset, tmp_buff, ub_size, num_per_core, my_rank, EVENT_ID0); const int64_t core_per_rank aivNum / pe_size; const int64_t core_rank_idx aivIndex % core_per_rank; const int64_t x aivIndex / core_per_rank; // Sync Ensure Corresponding Tasks Done. aclshmem_quiet(); aclshmemi_barrier_core_soft(); aclshmemx_signal_op(gva_sync_gm flag_offset, magic, ACLSHMEM_SIGNAL_SET, my_rank); aclshmem_signal_wait_until((__gm__ int32_t *)aclshmem_ptr(gva_sync_gm, x) flag_offset, ACLSHMEM_CMP_EQ, magic); // [AllGather Step 2] symmetric mem - local output. num_per_core elements / core_per_rank; output_offset x * elements core_rank_idx * num_per_core; gva_offset core_rank_idx * num_per_core; if (core_rank_idx core_per_rank - 1) { num_per_core elements - num_per_core * core_rank_idx; } aclshmemx_mte_get_nbi(output_gm output_offset, gva_data_gm gva_offset, tmp_buff, ub_size, num_per_core, x, EVENT_ID0); }【免费下载链接】shmemCANN SHMEM 是面向昇腾平台的多机多卡内存通信库基于OpenSHMEM 标准协议实现跨设备的高效内存访问与数据同步。项目地址: https://gitcode.com/cann/shmem创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考