Skip to content

Commit 6be9b40

Browse files
authored
P2p collective (#274)
* add dual directional test for nixl * add send recv over ipc * add simple memcpy_peer py code: one GPU only has one DMA, on mi300x, this can reach 45GB/s * handling local and remote connections in collectives * add working ipc transfer but hit data consistency problems * fixing gpuSetDevice bugs for gpuIpcCloseMemHandle. Now p2p has not memory consistency bugs * fixing value errors * slightly improve perf * adding multi-stream * find the performance anomaly is the cudaIpcCloudMemHande * expose remote_gpu_idx via uccl_accept() * nits
1 parent d3dadb3 commit 6be9b40

34 files changed

+1579
-189
lines changed

include/util/gpu_rt.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,24 @@
4242
#define gpuMemcpyPeerAsync cudaMemcpyPeerAsync
4343
#define gpuMemcpyDeviceToDevice cudaMemcpyDeviceToDevice
4444
#define gpuMemcpyFromSymbol cudaMemcpyFromSymbol
45+
#define gpuMemsetAsync cudaMemsetAsync
4546
#define gpuGetLastError cudaGetLastError
4647
#define gpuErrorPeerAccessAlreadyEnabled cudaErrorPeerAccessAlreadyEnabled
48+
#define gpuEvent_t cudaEvent_t
49+
#define gpuEventCreate cudaEventCreate
50+
#define gpuEventDestroy cudaEventDestroy
51+
#define gpuEventRecord cudaEventRecord
52+
#define gpuEventQuery cudaEventQuery
53+
#define gpuEventSynchronize cudaEventSynchronize
54+
#define gpuStreamWaitEvent cudaStreamWaitEvent
55+
#define gpuEventCreateWithFlags cudaEventCreateWithFlags
56+
#define gpuEventDefault cudaEventDefault
57+
#define gpuEventDisableTiming cudaEventDisableTiming
58+
#define gpuEventInterprocess cudaEventInterprocess
59+
#define gpuIpcEventHandle_t cudaIpcEventHandle_t
60+
#define gpuIpcGetEventHandle cudaIpcGetEventHandle
61+
#define gpuIpcOpenEventHandle cudaIpcOpenEventHandle
62+
#define gpuIpcCloseEventHandle cudaIpcCloseEventHandle
4763
#else
4864
#include <hip/hip_runtime.h>
4965
#include <hip/hip_runtime_api.h>
@@ -87,8 +103,24 @@
87103
#define gpuMemcpyPeerAsync hipMemcpyPeerAsync
88104
#define gpuMemcpyDeviceToDevice hipMemcpyDeviceToDevice
89105
#define gpuMemcpyFromSymbol hipMemcpyFromSymbol
106+
#define gpuMemsetAsync hipMemsetAsync
90107
#define gpuGetLastError hipGetLastError
91108
#define gpuErrorPeerAccessAlreadyEnabled hipErrorPeerAccessAlreadyEnabled
109+
#define gpuEvent_t hipEvent_t
110+
#define gpuEventCreate hipEventCreate
111+
#define gpuEventDestroy hipEventDestroy
112+
#define gpuEventRecord hipEventRecord
113+
#define gpuEventSynchronize hipEventSynchronize
114+
#define gpuEventQuery hipEventQuery
115+
#define gpuStreamWaitEvent hipStreamWaitEvent
116+
#define gpuEventCreateWithFlags hipEventCreateWithFlags
117+
#define gpuEventDefault hipEventDefault
118+
#define gpuEventDisableTiming hipEventDisableTiming
119+
#define gpuEventInterprocess hipEventInterprocess
120+
#define gpuIpcEventHandle_t hipIpcEventHandle_t
121+
#define gpuIpcGetEventHandle hipIpcGetEventHandle
122+
#define gpuIpcOpenEventHandle hipIpcOpenEventHandle
123+
#define gpuIpcCloseEventHandle(handle) (gpuSuccess)
92124
#endif
93125

94126
#define GPU_RT_CHECK(call) \

include/util/util.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,40 @@ inline int send_message(int sockfd, void const* buffer, size_t n_bytes) {
9898
return bytes_sent;
9999
}
100100

101+
inline int receive_message_nonblock(int sockfd, void* buffer, size_t n_bytes) {
102+
int bytes_read = 0;
103+
int r;
104+
while (bytes_read < static_cast<int>(n_bytes)) {
105+
r = read(sockfd, static_cast<char*>(buffer) + bytes_read,
106+
static_cast<size_t>(n_bytes - bytes_read));
107+
if (r < 0 && !(errno == EAGAIN || errno == EWOULDBLOCK)) {
108+
CHECK(false) << "ERROR reading from socket";
109+
}
110+
if (r > 0) {
111+
bytes_read += r;
112+
}
113+
}
114+
return bytes_read;
115+
}
116+
117+
inline int send_message_nonblock(int sockfd, void const* buffer,
118+
size_t n_bytes) {
119+
int bytes_sent = 0;
120+
int r;
121+
while (bytes_sent < static_cast<int>(n_bytes)) {
122+
// Make sure we write exactly n_bytes
123+
r = write(sockfd, static_cast<char const*>(buffer) + bytes_sent,
124+
n_bytes - bytes_sent);
125+
if (r < 0 && !(errno == EAGAIN || errno == EWOULDBLOCK)) {
126+
CHECK(false) << "ERROR writing to socket";
127+
}
128+
if (r > 0) {
129+
bytes_sent += r;
130+
}
131+
}
132+
return bytes_sent;
133+
}
134+
101135
inline void send_ready(int bootstrap_fd) {
102136
bool ready = true;
103137
int ret = send_message(bootstrap_fd, &ready, sizeof(bool));

0 commit comments

Comments
 (0)