Skip to content

Optimizes the Margo DTL's memory usage by matching the behavior of the UCX DTL#178

Open
ilumsden wants to merge 6 commits into
flux-framework:mainfrom
TauferLab:margo_memory_optimization
Open

Optimizes the Margo DTL's memory usage by matching the behavior of the UCX DTL#178
ilumsden wants to merge 6 commits into
flux-framework:mainfrom
TauferLab:margo_memory_optimization

Conversation

@ilumsden

@ilumsden ilumsden commented Jun 5, 2026

Copy link
Copy Markdown
Collaborator

I was using DYAD on Tuolumne recently, and I found two issues with DYAD.

First, I found that DYAD would crash whenever trying to send/recv data with a Mercury NA protocol string of ofi+cxi (i.e., what should be used for Slingshot). This was due to an oversight in the dyad_dtl_margo_rpc_pack function. That function assumes that the network address will fit into a statically sized 128 byte buffer. That assumption is invalid on Tuo because the Slingshot network normally uses network addresses larger than 128 bytes.

This PR fixes this first issue by updating dyad_dtl_margo_rpc_pack to call margo_addr_to_string twice. The first call is used to get the actual length of the network address. Then, after a malloc, the second call is used to actually obtain the network address in full.

Second, I found that DYAD was performing much worse than expected. When reviewing the code, I found that the main issue was in how the Margo DTL was managing memory.

By allocating a buffer and calling margo_bulk_create for every send/recv, the DTL was essentially incurring the cost of memory allocation + memory pinning on the NIC repeatedly throughout a run. To add onto that, when I started using DYAD on Tuo, the Margo bulk objects were never being freed, resulting in massive memory leaks. Those memory leaks were recently fixed by other PRs, but the underlying alloc + pin cost still exists.

To fix this issue, this PR mimics the memory reuse scheme used by the UCX DTL. It now creates a single allocation and calls margo_bulk_create once during initialization, and it frees and unpins the memory during finalization. On the sender side, the pinned memory buffer is returned to the broker module via dyad_dtl_margo_get_buffer to ensure the module reads directly from local storage into the pinned memory. Similarly, on the receiver side, the pinned memory is passed directly into the margo_bulk_transfer call to avoid excessive allocations. The dyad_dtl_margo_recv function then copies from that pinned memory into a new allocation to return to the DYAD client. This PR also adds C11's stdatomic.h to help the dyad_dtl_margo_recv and data_ready_rpc synchronize on data availability.

This PR also makes the max transfer size for the UCX and Margo DTL backends configurable via the new CMake cache variable DYAD_DTL_MAX_TRANSFER_SIZE. The value of this variable defaults to 4294967296 (i.e., 4 GiB) which matches the existing behavior of the UCX DTL.

I tested these changes on Tuo. They result in much better performance. In fact, with these changes, DYAD can achieve results comparable to a data transfer approach using MPI_Isend and MPI_Irecv in certain workflow situations.

@ilumsden ilumsden requested a review from JaeseungYeom June 5, 2026 18:07
@ilumsden ilumsden added bug Something isn't working enhancement New feature or request labels Jun 5, 2026

@JaeseungYeom JaeseungYeom left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not finished reviewing margo_dtl.c yet but I will come back later.

Comment thread src/dyad/dtl/margo_dtl.c
margo_handle->recv_buffer = NULL;
margo_handle->recv_len = 0;
margo_handle->recv_ready = 0;
atomic_store_explicit (&margo_handle->recv_ready, false, memory_order_relaxed);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I am curious about this. It is good to know that this variable needs to be protected. However, was there incident?
  • Reusing buffer would improve performance. However, that also means that there will be no concurrent transfer.
  • stdatomic seems to be optional with c11. We either bump up the minimum C standard required if the protection is necessary. Otherwise, do test as in cmake/tests at cmake configure time and propagate the choice via dyad_config.h


# UCX implementation for DTL
set(UCX_DTL_SRC ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.c ${CMAKE_CURRENT_SOURCE_DIR}/ucx_ep_cache.cpp)
set(UCX_DTL_SRC ${CMAKE_CURRENT_BINARY_DIR}/ucx_dtl.c ${CMAKE_CURRENT_SOURCE_DIR}/ucx_ep_cache.cpp)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this, MAX_TRANSFER_SIZE as an env variable.
And parse size characters at the end.
We wouldn't need to change CMakeLists.txt and make it needlessly static.

Comment thread src/dyad/dtl/margo_dtl.c
#include <mercury_macros.h>

// clang-format off
#define MARGO_MAX_TRANSFER_SIZE (@DYAD_DTL_MAX_TRANSFER_SIZE@)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sould be converted into the env variable DYAD_DTL_MAX_TX_SIZE_ENV "DYAD_DTL_MAX_TX_SIZE"

Comment thread src/dyad/dtl/margo_dtl.c
Comment thread src/dyad/dtl/margo_dtl.c
// clang-format on

margo_instance_id mid = margo_hg_handle_get_instance (h);
margo_set_log_level (mid, MARGO_LOG_INFO);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#if define(DYAD_LOGGER_LEVEL_INFO)
margo_set_log_level (mid, MARGO_LOG_INFO);
#endif

Comment thread src/dyad/dtl/margo_dtl.c
{
DYAD_C_FUNCTION_START ();
dyad_rc_t rc = DYAD_RC_OK;
if (data_buf == NULL || *data_buf == NULL) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that static buffer is already checked but it is still safer to check especially if we are going to reallocate.
In UCX case, you had a different problem, an unsolved mystery. That is something to revisit as well.

Comment thread src/dyad/dtl/margo_dtl.c

// dyad_rc_t rc = DYAD_RC_OK;
dyad_dtl_margo_t *margo_handle = NULL;
dyad_rc_t rc = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize it with a return code macro.

Comment thread src/dyad/dtl/margo_dtl.h

#include <dyad/dtl/dyad_dtl_api.h>
#include <margo.h>
#include <stdatomic.h>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is guaranteed to be available in C11.

{
dyad_mod_ctx_t *mod_ctx = (dyad_mod_ctx_t *)arg;
flux_msg_handler_delvec (mod_ctx->handlers);
if (mod_ctx->handlers != NULL) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!

Comment thread CMakeLists.txt
endif()


set(DYAD_DTL_MAX_TRANSFER_SIZE "4294967296" CACHE STRING "Maximum transfer size supported by DYAD in bytes")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want this be rather runtime configurable.
CMake can set the deafult value.
DYAD_DTL_MAX_TX_SIZE_DEFAULT, which should be written to dyad_config.h
This should be commonly used for both UCX and MARGO, client and server.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is from shuffle test.
bool parse_size (const char* str, size_t& out)
{
char* end;
long long val = strtoll (str, &end, 10);
if (val <= 0 || end == str)
return false;

switch (*end) {
    case 'K':
    case 'k':
        out = static_cast<size_t> (val) * 1024ULL;
        break;
    case 'M':
    case 'm':
        out = static_cast<size_t> (val) * 1024ULL * 1024ULL;
        break;
    case 'G':
    case 'g':
        out = static_cast<size_t> (val) * 1024ULL * 1024ULL * 1024ULL;
        break;
    case '\0':
        out = static_cast<size_t> (val);
        break;
    default:
        return false;  // unrecognized suffix
}
return true;

}

@JaeseungYeom

JaeseungYeom commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Minimum to merge is to make the transfer size parameter environment variable controllable.

Comment thread src/dyad/dtl/margo_dtl.c
}
addr_str = malloc (addr_str_size);
if (addr_str == NULL) {
rc = DYAD_RC_SYSFAIL;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Need to define a RC macro for this as well as other cases where assert is called.

Comment thread src/dyad/dtl/margo_dtl.c
margo_addr_free (margo_handle->mid, margo_handle->remote_addr);
margo_finalize (margo_handle->mid);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this up before if (margo_handle->mid != MARGO_INSTANCE_NULL) {

Comment thread src/dyad/dtl/margo_dtl.c
if (margo_handle->mid != MARGO_INSTANCE_NULL) {
margo_addr_free (margo_handle->mid, margo_handle->local_addr);
if (margo_handle->remote_addr != NULL)
margo_addr_free (margo_handle->mid, margo_handle->remote_addr);

@JaeseungYeom JaeseungYeom Jun 5, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set margo_handle->remote_addr = HG_ADDR_NULL after margo_addr_free()

Comment thread src/dyad/dtl/margo_dtl.c
}

if (margo_handle->mid != MARGO_INSTANCE_NULL) {
margo_addr_free (margo_handle->mid, margo_handle->local_addr);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

margo_handle->local_addr = HG_ADDR_NULL

Comment thread src/dyad/dtl/margo_dtl.c

if (margo_handle->mid != MARGO_INSTANCE_NULL) {
margo_addr_free (margo_handle->mid, margo_handle->local_addr);
if (margo_handle->remote_addr != NULL)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

margo_handle->remote_addr != HG_ADDR_NULL

Comment thread src/dyad/dtl/margo_dtl.c

// both margo client and server
margo_addr_self (margo_handle->mid, &margo_handle->local_addr);
margo_handle->remote_addr = NULL;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

margo_handle->remote_addr = HG_ADDR_NULL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants