Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: add unit test for shared memory
Browse files Browse the repository at this point in the history
GuanLuo committed Oct 21, 2024
1 parent 519124f commit 4d7a0b8
Showing 3 changed files with 212 additions and 9 deletions.
182 changes: 182 additions & 0 deletions src/python/library/tests/test_shared_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import unittest

import numpy
import tritonclient.utils as utils
import tritonclient.utils.shared_memory as shm


class SharedMemoryTest(unittest.TestCase):
"""
Testing shared memory utilities
"""

def setUp(self):
self.shm_handles = []

def tearDown(self):
for shm_handle in self.shm_handles:
# [NOTE] wrapper for old implementation that will fail
try:
shm.destroy_shared_memory_region(shm_handle)
except shm.SharedMemoryException as ex:
if "unlink" in str(ex):
pass
else:
raise ex

def test_lifecycle(self):
cpu_tensor = numpy.ones([4, 4], dtype=numpy.float32)
byte_size = 64
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", byte_size)
)

self.assertEqual(len(shm.mapped_shared_memory_regions()), 1)

# Set data from Numpy array
shm.set_shared_memory_region(self.shm_handles[0], [cpu_tensor])
shm_tensor = shm.get_contents_as_numpy(
self.shm_handles[0], numpy.float32, [4, 4]
)

self.assertTrue(numpy.allclose(cpu_tensor, shm_tensor))

shm.destroy_shared_memory_region(self.shm_handles.pop(0))

def test_set_region_offset(self):
large_tensor = numpy.ones([4, 4], dtype=numpy.float32)
large_size = 64
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", large_size)
)
shm.set_shared_memory_region(self.shm_handles[0], [large_tensor])
small_tensor = numpy.zeros([2, 4], dtype=numpy.float32)
small_size = 32
shm.set_shared_memory_region(
self.shm_handles[0], [small_tensor], offset=large_size - small_size
)
shm_tensor = shm.get_contents_as_numpy(
self.shm_handles[0], numpy.float32, [2, 4], offset=large_size - small_size
)

self.assertTrue(numpy.allclose(small_tensor, shm_tensor))

# [NOTE] current impl will fail
def test_set_region_oversize(self):
large_tensor = numpy.ones([4, 4], dtype=numpy.float32)
small_size = 32
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", small_size)
)
with self.assertRaises(shm.SharedMemoryException):
shm.set_shared_memory_region(self.shm_handles[0], [large_tensor])

def test_duplicate_key(self):
# [NOTE] change in behavior:
# previous: okay to create shared memory region of the same key with different size
# and the behavior is not being study clearly.
# now: only allow create by default, flag may be set to return the same handle if
# existed, warning will be print if size is different
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", 32)
)
with self.assertRaises(shm.SharedMemoryException):
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", 32)
)

# Get handle to the same shared memory region but with larger size requested,
# check if actual size is checked
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", 64, create=False)
)

self.assertEqual(len(shm.mapped_shared_memory_regions()), 1)

large_tensor = numpy.ones([4, 4], dtype=numpy.float32)
small_size = 32

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable small_size is not used.
# [NOTE] current impl will fail
with self.assertRaises(shm.SharedMemoryException):
shm.set_shared_memory_region(self.shm_handles[-1], [large_tensor])

# [NOTE] current impl will fail
def test_destroy_duplicate(self):
# [NOTE] change in behavior:
# previous: raise exception if underlying shared memory has been unlinked
# now: the exception will be suppressed to align with Windows behavior, unless
# explicitly toggled
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", 64)
)
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", 32, create=False)
)
self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", 32, create=False)
)
self.assertEqual(len(shm.mapped_shared_memory_regions()), 1)

shm.destroy_shared_memory_region(self.shm_handles.pop(0))
self.assertEqual(len(shm.mapped_shared_memory_regions()), 0)

shm.destroy_shared_memory_region(self.shm_handles.pop(0))
with self.assertRaises(shm.SharedMemoryException):
shm.destroy_shared_memory_region(
self.shm_handles.pop(0), raise_unlink_exception=True
)

def test_numpy_bytes(self):
int_tensor = numpy.arange(start=0, stop=16, dtype=numpy.int32)
bytes_tensor = numpy.array(
[str(x).encode("utf-8") for x in int_tensor.flatten()], dtype=object
)
bytes_tensor = bytes_tensor.reshape(int_tensor.shape)
bytes_tensor_serialized = utils.serialize_byte_tensor(bytes_tensor)
byte_size = utils.serialized_byte_size(bytes_tensor_serialized)

self.shm_handles.append(
shm.create_shared_memory_region("shm_name", "shm_key", byte_size)
)

# Set data from Numpy array
shm.set_shared_memory_region(self.shm_handles[0], [bytes_tensor_serialized])

shm_tensor = shm.get_contents_as_numpy(
self.shm_handles[0],
numpy.object_,
[
16,
],
)

self.assertTrue(numpy.array_equal(bytes_tensor, shm_tensor))


if __name__ == "__main__":
unittest.main()
30 changes: 25 additions & 5 deletions src/python/library/tritonclient/utils/shared_memory/__init__.py
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ def _raise_error(msg):
raise ex


def create_shared_memory_region(triton_shm_name, shm_key, byte_size):
def create_shared_memory_region(triton_shm_name, shm_key, byte_size, create=True):
"""Creates a system shared memory region with the specified name and size.
Parameters
@@ -113,6 +113,11 @@ def create_shared_memory_region(triton_shm_name, shm_key, byte_size):
If unable to create the shared memory region.
"""

if create and shm_key in mapped_shm_regions:
raise SharedMemoryException(
"unable to create the shared memory region, already exists"
)

shm_handle = c_void_p()
_raise_if_error(
c_int(
@@ -121,7 +126,9 @@ def create_shared_memory_region(triton_shm_name, shm_key, byte_size):
)
)
)
mapped_shm_regions.append(shm_key)

if create:
mapped_shm_regions.append(shm_key)

return shm_handle

@@ -271,7 +278,7 @@ def mapped_shared_memory_regions():
return mapped_shm_regions


def destroy_shared_memory_region(shm_handle):
def destroy_shared_memory_region(shm_handle, raise_unlink_exception=False):
"""Unlink a system shared memory region with the specified handle.
Parameters
@@ -306,8 +313,20 @@ def destroy_shared_memory_region(shm_handle):
# fail, a re-attempt could result in a segfault. Secondarily, if we
# fail to delete a region, we should not report it back to the user
# as a valid memory region.
mapped_shm_regions.remove(shm_key.value.decode("utf-8"))
_raise_if_error(c_int(_cshm_shared_memory_region_destroy(shm_handle)))
try:
mapped_shm_regions.remove(shm_key.value.decode("utf-8"))
except ValueError:
# okay if mapped_shm_regions doesn't have the key as there can be
# destroy call on handles with the same shared memory key
pass
try:
_raise_if_error(c_int(_cshm_shared_memory_region_destroy(shm_handle)))
except SharedMemoryException as ex:
# Suppress unlink exception except when explicitly allow to raise
if not raise_unlink_exception and "unlink" in str(ex):
pass
else:
raise ex
return


@@ -328,6 +347,7 @@ def __init__(self, err):
-4: "unable to read/mmap the shared memory region",
-5: "unable to unlink the shared memory region",
-6: "unable to munmap the shared memory region",
-7: "unable to set the shared memory region",
}
self._msg = None
if type(err) == str:
Original file line number Diff line number Diff line change
@@ -108,10 +108,11 @@ int
SharedMemoryRegionSet(
void* shm_handle, size_t offset, size_t byte_size, const void* data)
{
void* shm_addr =
reinterpret_cast<SharedMemoryHandle*>(shm_handle)->base_addr_;
char* shm_addr_offset = reinterpret_cast<char*>(shm_addr);
std::memcpy(shm_addr_offset + offset, data, byte_size);
auto shm = reinterpret_cast<SharedMemoryHandle*>(shm_handle);
if (shm->byte_size_ < (offset + byte_size)) {
return -7;
}
std::memcpy(shm->base_addr_ + offset, data, byte_size);
return 0;
}

0 comments on commit 4d7a0b8

Please sign in to comment.