diff --git a/.github/workflows/build-and-publish.yml b/.github/workflows/build-and-publish.yml
index 4cdad3e..0172db8 100644
--- a/.github/workflows/build-and-publish.yml
+++ b/.github/workflows/build-and-publish.yml
@@ -4,11 +4,14 @@ on:
push:
branches:
- master
- - dev
tags:
- '*'
+ pull_request:
+ branches:
+ - master
+
jobs:
build:
@@ -36,41 +39,54 @@ jobs:
if: ${{ env.IS_RELEASE == 'true' }}
run: python build/create_tag_body.py
- - name: Set up Python
- uses: actions/setup-python@v3
+ - name: Set up dotnet
+ uses: actions/setup-dotnet@v4
with:
- python-version: '3.9'
+ dotnet-version: "9.0.x"
+ dotnet-quality: "preview"
- - name: Install
- run: |
- sudo apt install jq
- npm install -g pyright
- python -m pip install build wheel pytest pytest-asyncio
- python -m pip install --pre --index-url https://www.myget.org/F/apollo3zehn-dev/python/ nexus-extensibility
+ # - name: Set up Python
+ # uses: actions/setup-python@v3
+ # with:
+ # python-version: '3.9'
- - name: Prepare
- run: |
- chmod +x tests/Nexus.Sources.Remote.Tests/bash/remote.sh
+ - name: Create Docker Output Folder
+ run: mkdir --parent artifacts/images
+
+ # - name: Install
+ # run: |
+ # npm install -g pyright
+ # python -m pip install build wheel pytest pytest-asyncio
+ # python -m pip install --pre --index-url https://www.myget.org/F/apollo3zehn-dev/python/ nexus-extensibility
+
+ - name: Docker Setup
+ id: buildx
+ uses: docker/setup-buildx-action@v1
- name: Build
run: |
- dotnet build -c Release /p:GeneratePackage=true src/remoting/dotnet-remoting/dotnet-remoting.csproj
- python -m build --wheel --outdir artifacts/package --no-isolation src/remoting/python-remoting
+ dotnet publish -c Release -o app /p:GeneratePackage=true src/Nexus.Agent/Nexus.Agent.csproj
+ # python -m build --wheel --outdir artifacts/package --no-isolation src/remoting/python-remoting
- name: Test
run: |
- dotnet test -c Release --filter TestCategory=local
- pyright
- pytest
- sudo bash tests/Nexus.Sources.Remote.Tests/SetupDockerTests.sh
+ dotnet test -c Release
+ # pyright
+ # pytest
+
+ - name: Docker Build
+ run: |
+ docker build -t nexus-main/nexus-agent:v_next -f src/Nexus.Agent/Dockerfile .
+ docker save --output artifacts/images/nexus_agent_image.tar nexus-main/nexus-agent:v_next
- name: Upload Artifacts
uses: actions/upload-artifact@v3
with:
name: artifacts
path: |
- artifacts/tag_body.txt
+ artifacts/*.txt
artifacts/package/
+ artifacts/images/
outputs:
is_release: ${{ env.IS_RELEASE }}
@@ -92,17 +108,30 @@ jobs:
name: artifacts
path: artifacts
- # GitHub Package Registry is broken by design: https://github.community/t/download-from-github-package-registry-without-authentication/14407/138
+ - name: Docker Load Image
+ run: docker load --input artifacts/images/nexus_agent_image.tar
+
- name: Nuget package (MyGet)
run: dotnet nuget push 'artifacts/package/release/*.nupkg' --api-key ${MYGET_API_KEY} --source https://www.myget.org/F/apollo3zehn-dev/api/v3/index.json
env:
MYGET_API_KEY: ${{ secrets.MYGET_API_KEY }}
- # GitHub Package Registry does not support Python packages: https://github.community/t/pypi-compatible-github-package-registry/14615
- - name: Python package (MyGet)
- run: 'for filePath in artifacts/package/*.whl; do curl -k -X POST https://www.myget.org/F/apollo3zehn-dev/python/upload -H "Authorization: Bearer ${MYGET_API_KEY}" -F "data=@$filePath"; done'
- env:
- MYGET_API_KEY: ${{ secrets.MYGET_API_KEY }}
+ # - name: Python package (MyGet)
+ # run: 'for filePath in artifacts/package/*.whl; do curl -k -X POST https://www.myget.org/F/apollo3zehn-dev/python/upload -H "Authorization: Bearer ${MYGET_API_KEY}" -F "data=@$filePath"; done'
+ # env:
+ # MYGET_API_KEY: ${{ secrets.MYGET_API_KEY }}
+
+ - name: Docker Login (Github Container Registry)
+ uses: docker/login-action@v1
+ with:
+ registry: ghcr.io
+ username: ${{ github.actor }}
+ password: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Docker Push
+ run: |
+ docker tag nexus-main/nexus-agent:v_next ghcr.io/nexus-main/nexus-agent:${{ needs.build.outputs.version }}
+ docker push ghcr.io/nexus-main/nexus-agent:${{ needs.build.outputs.version }}
publish_release:
@@ -114,9 +143,9 @@ jobs:
steps:
- - name: Install
- run: |
- python -m pip install twine
+ # - name: Install
+ # run: |
+ # python -m pip install twine
- name: Download Artifacts
uses: actions/download-artifact@v3
@@ -124,6 +153,9 @@ jobs:
name: artifacts
path: artifacts
+ - name: Docker Load Image
+ run: docker load --input artifacts/images/nexus_agent_image.tar
+
- name: GitHub Release Artifacts
uses: softprops/action-gh-release@v1
with:
@@ -134,7 +166,18 @@ jobs:
env:
NUGET_API_KEY: ${{ secrets.NUGET_API_KEY }}
- - name: Python Package (PyPI)
- run: twine upload artifacts/package/*.whl -u__token__ -p"${PYPI_API_KEY}"
- env:
- PYPI_API_KEY: ${{ secrets.PYPI_API_KEY }}
\ No newline at end of file
+ # - name: Python Package (PyPI)
+ # run: twine upload artifacts/package/*.whl -u__token__ -p"${PYPI_API_KEY}"
+ # env:
+ # PYPI_API_KEY: ${{ secrets.PYPI_API_KEY }}
+
+ - name: Docker Login (Docker Hub)
+ uses: docker/login-action@v1
+ with:
+ username: nexusmain
+ password: ${{ secrets.DOCKER_API_KEY }}
+
+ - name: Docker Push
+ run: |
+ docker tag nexus-main/nexus-agent:v_next nexusmain/nexus-agent:${{ needs.build.outputs.version }}
+ docker push nexusmain/nexus-agent:${{ needs.build.outputs.version }}
\ No newline at end of file
diff --git a/.nexus-agent/config/packages.json b/.nexus-agent/config/packages.json
new file mode 100644
index 0000000..b3fc53e
--- /dev/null
+++ b/.nexus-agent/config/packages.json
@@ -0,0 +1,10 @@
+{
+ "c05b592f-e198-472d-9902-3f60cf0a6332": {
+ "Provider": "local",
+ "Configuration": {
+ "path": "../../tests/Nexus.Sources.Remote.Tests/dotnet",
+ "version": "v1",
+ "csproj": "remote.csproj"
+ }
+ }
+}
\ No newline at end of file
diff --git a/.vscode/launch.json b/.vscode/launch.json
new file mode 100644
index 0000000..17d4fc0
--- /dev/null
+++ b/.vscode/launch.json
@@ -0,0 +1,19 @@
+{
+ "version": "0.2.0",
+ "configurations": [
+ {
+ "name": "Launch Nexus.Agent",
+ "type": "coreclr",
+ "request": "launch",
+ "preLaunchTask": "build-nexus-agent",
+ "program": "${workspaceFolder}/artifacts/bin/Nexus.Agent/debug/Nexus.Agent.dll",
+ "args": [],
+ "cwd": "${workspaceFolder}/src/Nexus.Agent",
+ "stopAtEntry": false,
+ "env": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "NEXUSAGENT_Paths__Config": "../../.nexus-agent/config"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 1b88bd7..532fe2b 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -2,23 +2,23 @@
"version": "2.0.0",
"tasks": [
{
- "label": "build",
+ "label": "build-nexus-agent",
"command": "dotnet",
"type": "process",
"args": [
"build",
- "${workspaceFolder}/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj",
+ "${workspaceFolder}/src/Nexus.Agent/Nexus.Agent.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
},
{
- "label": "publish",
+ "label": "build",
"command": "dotnet",
"type": "process",
"args": [
- "publish",
+ "build",
"${workspaceFolder}/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
@@ -26,14 +26,14 @@
"problemMatcher": "$msCompile"
},
{
- "label": "watch",
+ "label": "publish",
"command": "dotnet",
"type": "process",
"args": [
- "watch",
- "run",
- "--project",
- "${workspaceFolder}/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj"
+ "publish",
+ "${workspaceFolder}/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj",
+ "/property:GenerateFullPaths=true",
+ "/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 176b3e0..ecde93e 100755
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## v2.0.0-beta.39 - 2025-01-08
+
+- Release Nexus.Agent.
+
## v2.0.0-beta.24 - 2024-03-15
- Follow Nexus changes.
diff --git a/Directory.Build.props b/Directory.Build.props
index db678da..eeb7e32 100755
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -2,7 +2,7 @@
true
- net8.0
+ net9.0
enable
enable
true
diff --git a/Remote.sln b/Remote.sln
index b95dc25..7d46acc 100644
--- a/Remote.sln
+++ b/Remote.sln
@@ -19,6 +19,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmarks", "benchmarks",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Nexus.Benchmarks", "benchmarks\Nexus.Benchmarks\Nexus.Benchmarks.csproj", "{B602750B-5E89-453F-841B-8D7F22DE58EB}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nexus.Agent", "src\Nexus.Agent\Nexus.Agent.csproj", "{71F41120-5BC2-4684-ADB7-C8C97CCE8EAF}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -41,6 +43,10 @@ Global
{B602750B-5E89-453F-841B-8D7F22DE58EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B602750B-5E89-453F-841B-8D7F22DE58EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B602750B-5E89-453F-841B-8D7F22DE58EB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {71F41120-5BC2-4684-ADB7-C8C97CCE8EAF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {71F41120-5BC2-4684-ADB7-C8C97CCE8EAF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {71F41120-5BC2-4684-ADB7-C8C97CCE8EAF}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {71F41120-5BC2-4684-ADB7-C8C97CCE8EAF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -51,6 +57,7 @@ Global
{59CBCE56-7832-4B55-854F-C9B5E4FBEA86} = {F226553B-2E5B-4910-AF31-0B718872AF18}
{FC047115-277B-448E-854F-275B5B8343A5} = {59CBCE56-7832-4B55-854F-C9B5E4FBEA86}
{B602750B-5E89-453F-841B-8D7F22DE58EB} = {40C5CF08-E45E-4F25-BE8F-B5BBD4B0EDB3}
+ {71F41120-5BC2-4684-ADB7-C8C97CCE8EAF} = {F226553B-2E5B-4910-AF31-0B718872AF18}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B035E550-8A68-427B-92F1-9B1A6977BF8F}
diff --git a/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj b/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj
index 9857d80..a34fbf9 100644
--- a/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj
+++ b/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj
@@ -6,7 +6,7 @@
-
+
diff --git a/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs b/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs
index 53689fa..86376d5 100644
--- a/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs
+++ b/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs
@@ -26,7 +26,7 @@ public void GlobalSetup()
{
Arguments = $"{assemblyPath} pipe",
UseShellExecute = false,
- RedirectStandardInput = true,
+ RedirectStandardInput = false,
RedirectStandardOutput = true,
RedirectStandardError = true
};
diff --git a/setup/README.md b/setup/README.md
deleted file mode 100644
index 9590aff..0000000
--- a/setup/README.md
+++ /dev/null
@@ -1,18 +0,0 @@
-# How to run the test:
-
-1. `cd` into the `setup/docker` folder
-
-2. Run the follow on the docker host:
-
-```sh
-sudo bash setup-host.sh python
-sudo bash setup-host.sh dotnet
-sudo docker-compose up -d
-```
-
-3. Wait a few seconds until the containers are running, then test the connection between both containers:
-
-```sh
-sudo docker exec nexus-main \
- bash -c "cd /root/nexus-sources-remote; dotnet test --filter Nexus.Sources.Tests.SetupDockerTests"
-```
\ No newline at end of file
diff --git a/setup/docker/docker-compose.yml b/setup/docker/docker-compose.yml
deleted file mode 100644
index 1e7b3f9..0000000
--- a/setup/docker/docker-compose.yml
+++ /dev/null
@@ -1,27 +0,0 @@
-version: "3.7"
-
-services:
-
- main:
- container_name: nexus-main
- image: mcr.microsoft.com/dotnet/sdk:8.0
- volumes:
- - /var/lib/nexus/docker/nexus-main/.ssh:/root/.ssh
- entrypoint: bash -c "cd; curl -s -O 'https://raw.githubusercontent.com/nexus-main/nexus-sources-remote/master/setup/docker/setup-main.sh'; source 'setup-main.sh'"
- restart: always
-
- satellite-python:
- container_name: nexus-python
- image: python:3.9.0
- volumes:
- - /var/lib/nexus/docker/nexus-python/.ssh:/root/.ssh
- entrypoint: bash -c "satellite_id=python;cd; curl -s -O 'https://raw.githubusercontent.com/nexus-main/nexus-sources-remote/master/setup/docker/setup-satellite.sh'; source 'setup-satellite.sh'"
- restart: always
-
- satellite-nexus:
- container_name: nexus-dotnet
- image: mcr.microsoft.com/dotnet/sdk:8.0
- volumes:
- - /var/lib/nexus/docker/nexus-dotnet/.ssh:/root/.ssh
- entrypoint: bash -c "satellite_id=dotnet;cd; curl -s -O 'https://raw.githubusercontent.com/nexus-main/nexus-sources-remote/master/setup/docker/setup-satellite.sh'; source 'setup-satellite.sh'"
- restart: always
diff --git a/setup/docker/dotnet/satellite.sh b/setup/docker/dotnet/satellite.sh
deleted file mode 100644
index 5f16811..0000000
--- a/setup/docker/dotnet/satellite.sh
+++ /dev/null
@@ -1,27 +0,0 @@
-red=$'\e[0;31m'
-green=$'\e[0;32m'
-orange=$'\e[0;33m'
-white=$'\e[0m'
-
-echo "${green}Welcome to the dotnet satellite.sh script!${white}"
-project=$3
-
-if [[ -f "../tag_changed" || ! -f "../build_successful" ]]; then
-
- echo "Build project ${green}${project}${white}"
- dotnet build -c Release ${project}
-
- if [[ $? == 0 ]]; then
- touch "../build_successful"
- else
- echo "${red}Build failed${white}"
- rm --force "../build_successful"
- fi
-fi
-
-# run user code
-shift
-shift
-shift
-echo "Run command ${green}dotnet run -c Release --no-build --project ${project} -- $@${white}"
-dotnet run -c Release --no-build --project ${project} -- $@
diff --git a/setup/docker/python/satellite.sh b/setup/docker/python/satellite.sh
deleted file mode 100644
index 2bdfdb0..0000000
--- a/setup/docker/python/satellite.sh
+++ /dev/null
@@ -1,34 +0,0 @@
-green=$'\e[0;32m'
-orange=$'\e[0;33m'
-white=$'\e[0m'
-
-echo "${green}Welcome to the python satellite.sh script!${white}"
-
-# virtual environment (!!DO NOT QUOTE TO ALLOW TILDE EXPANSION!!)
-env=~/venv
-
-if [ ! -d $env ]; then
- echo "Create virtual environment ${green}${env}${white}"
- python3 -m venv $env
-fi
-
-echo "Activate virtual environment ${green}${env}${white}"
-source $env/bin/activate
-
-# requirements
-if [ -f "requirements.txt" ]; then
-
- if [ -f "../tag_changed" ]; then
- echo "${green}Install requirements${white}"
- python -m pip install --pre --index-url https://pypi.python.org/simple --extra-index-url https://www.myget.org/F/apollo3zehn-dev/python/ -r "requirements.txt" --disable-pip-version-check
- fi
-
-else
- echo "${orange}No requirements.txt found${white}"
-fi
-
-# run user code
-shift
-shift
-echo "Run command ${green}python $@${white}"
-python $@
diff --git a/setup/docker/run-user.sh b/setup/docker/run-user.sh
deleted file mode 100644
index 0f67103..0000000
--- a/setup/docker/run-user.sh
+++ /dev/null
@@ -1,39 +0,0 @@
-green=$'\e[0;32m'
-orange=$'\e[0;33m'
-white=$'\e[0m'
-
-echo "Continue as: ${green}$(whoami)${white}"
-
-# check if git project exists
-if [[ ! -d 'repository' ]]; then
- mkdir -p 'repository'
-fi
-
-cd 'repository'
-
-(
- clone_required=false
-
- if [ -d '.git' ]; then
- current_tag=$(git describe --tags)
- echo "Current tag is ${green}${current_tag}${white}"
-
- if [ "$current_tag" != "$2" ]; then
- rm --force -r .* * 2> /dev/null
- clone_required=true
- fi
- else
- rm --force -r .* * 2> /dev/null
- clone_required=true
- fi
-
- if [[ "$clone_required" = true ]]; then
- echo "Clone repository ${orange}$1${white} @ ${orange}$2${white}"
- git clone -c advice.detachedHead=false --depth 1 --branch $2 $1 .
- touch "../tag_changed"
- else
- rm --force "../tag_changed"
- fi
-) 100>"/tmp/run-user-$(whoami).lock"
-
-source "../satellite.sh"
diff --git a/setup/docker/run.sh b/setup/docker/run.sh
deleted file mode 100644
index 72cb823..0000000
--- a/setup/docker/run.sh
+++ /dev/null
@@ -1,52 +0,0 @@
-red=$'\e[0;31m'
-green=$'\e[0;32m'
-orange=$'\e[0;33m'
-white=$'\e[0m'
-
-locked()
-{
- >&2 echo "${red}Unable to acquire the file lock${white}"
- exit 1
-}
-
-if (( $# < 2 )); then
- >&2 echo "${red}Illegal number of parameters${white}"
- exit 1
-fi
-
-echo "The git-url is: ${green}$1${white}"
-
-# derive user id
-user_id=$(echo -n $1 | openssl dgst -binary -md5 | openssl base64)
-user_id=${user_id//[\/=]/_}
-echo "Derived user id: ${green}$user_id${white}"
-
-(
- # get lock (wait max 10 s)
- flock -w 10 100 || locked
-
- # get or add user
- if id $user_id &>/dev/null; then
- echo "User ${green}exists${white}"
- # password=$(<"password-store/$user_id")
- else
- echo "User ${orange}does not exist${white}"
- password=$(cat /dev/urandom | tr -dc a-zA-Z0-9 | fold -w 14 | head -n 1)
- mkdir --parents "password-store"
- echo $password > "password-store/$user_id"
- useradd --password $password $user_id
- echo "Created user ${green}$user_id${white}"
- fi
-
- # prepare user folder
- mkdir --parents "/home/$user_id"
- cp --force "run-user.sh" "/home/$user_id/run-user.sh"
- cp --force "satellite.sh" "/home/$user_id/satellite.sh"
- chown --recursive $user_id:$user_id "/home/$user_id"
-
-) 100>"/tmp/run-$user_id.lock"
-
-# continue as $user_id
-cd "/home/$user_id"
-command="bash run-user.sh $@"
-su $user_id -c "$command"
\ No newline at end of file
diff --git a/setup/docker/setup-host.sh b/setup/docker/setup-host.sh
deleted file mode 100644
index 4eec85c..0000000
--- a/setup/docker/setup-host.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-green=$'\e[0;32m'
-white=$'\e[0m'
-
-satellite_id=$1
-
-# Generate key for main container and add config file, but do not override if it already exists
-main_folder='/var/lib/nexus/docker/nexus-main'
-
-if [[ ! -d ${main_folder}/.ssh ]]; then
- echo "Generate SSH key for container ${green}nexus-main${white}"
- mkdir -p "${main_folder}/.ssh"
- ssh-keygen -q -t rsa -N '' -f "${main_folder}/.ssh/id_rsa" <</dev/null 2>&1
- echo "StrictHostKeyChecking no" > "${main_folder}/.ssh/config"
- echo "UserKnownHostsFile=/dev/null" >> "${main_folder}/.ssh/config"
-fi
-
-# Generate key for satellite container and add main container key to authorized keys file
-echo "Generate SSH key for container ${green}nexus-${satellite_id}${white}"
-satellite_folder="/var/lib/nexus/docker/nexus-${satellite_id}"
-mkdir -p "${satellite_folder}/.ssh"
-ssh-keygen -q -t rsa -N '' -f "${satellite_folder}/.ssh/id_rsa" <</dev/null 2>&1
-cat "${main_folder}/.ssh/id_rsa.pub" > "${satellite_folder}/.ssh/authorized_keys"
diff --git a/setup/docker/setup-main.sh b/setup/docker/setup-main.sh
deleted file mode 100644
index 37454ac..0000000
--- a/setup/docker/setup-main.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-set -e
-
-green=$'\e[0;32m'
-white=$'\e[0m'
-echo "${green}Welcome to the setup-main.sh script"'!'"${white}"
-
-echo "${green}Set up SSH client${white}"
-apt update
-apt install openssh-client -y
-
-echo "Clone repository ${green}https://github.com/nexus-main/nexus-sources-remote${white}"
-git clone https://github.com/nexus-main/nexus-sources-remote
-cd nexus-sources-remote
-
-mkdir -p /var/lib/nexus
-touch "/var/lib/nexus/ready"
-
-trap : TERM INT
-sleep infinity & wait
diff --git a/setup/docker/setup-satellite.sh b/setup/docker/setup-satellite.sh
deleted file mode 100644
index f9c1907..0000000
--- a/setup/docker/setup-satellite.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-set -e
-
-green=$'\e[0;32m'
-white=$'\e[0m'
-echo "${green}Welcome to the setup-${satellite_id}.sh script"'!'"${white}"
-
-echo "${green}Set up SSH server${white}"
-apt update
-apt install openssh-server -y
-sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/g' '/etc/ssh/sshd_config'
-service ssh start
-
-echo "${green}Load run.sh, run-user.sh and ${satellite_id}/satellite.sh scripts${white}"
-curl -s -O 'https://raw.githubusercontent.com/nexus-main/nexus-sources-remote/master/setup/docker/run.sh'
-curl -s -O 'https://raw.githubusercontent.com/nexus-main/nexus-sources-remote/master/setup/docker/run-user.sh'
-curl -s -O "https://raw.githubusercontent.com/nexus-main/nexus-sources-remote/master/setup/docker/${satellite_id}/satellite.sh"
-
-mkdir -p /var/lib/nexus
-touch "/var/lib/nexus/ready"
-
-trap : TERM INT
-sleep infinity & wait
diff --git a/src/Nexus.Agent/Controllers/PackageReferencesController.cs b/src/Nexus.Agent/Controllers/PackageReferencesController.cs
new file mode 100644
index 0000000..1637619
--- /dev/null
+++ b/src/Nexus.Agent/Controllers/PackageReferencesController.cs
@@ -0,0 +1,82 @@
+// MIT License
+// Copyright (c) [2024] [nexus-main]
+
+using Asp.Versioning;
+using Microsoft.AspNetCore.Mvc;
+using Nexus.PackageManagement.Services;
+using Nexus.PackageManagement;
+
+namespace Nexus.Controllers;
+
+///
+/// Provides access to package references.
+///
+[ApiController]
+[ApiVersion("1.0")]
+[Route("api/v{version:apiVersion}/[controller]")]
+internal class PackageReferencesController(
+ IPackageService packageService,
+ IExtensionHive extensionHive) : ControllerBase
+{
+ // GET /api/packagereferences
+ // POST /api/packagereferences
+ // DELETE /api/packagereferences/{id}
+ // GET /api/packagereferences/{id}/versions
+
+ private readonly IPackageService _packageService = packageService;
+
+ private readonly IExtensionHive _extensionHive = extensionHive;
+
+ ///
+ /// Gets the list of package references.
+ ///
+ ///
+ [HttpGet]
+ public Task> GetAsync()
+ {
+ return _packageService.GetAllAsync();
+ }
+
+ ///
+ /// Creates a package reference.
+ ///
+ /// The package reference to create.
+ [HttpPost]
+ public Task CreateAsync(
+ [FromBody] PackageReference packageReference)
+ {
+ return _packageService.PutAsync(packageReference);
+ }
+
+ ///
+ /// Deletes a package reference.
+ ///
+ /// The ID of the package reference.
+ [HttpDelete("{id}")]
+ public Task DeleteAsync(
+ Guid id)
+ {
+ return _packageService.DeleteAsync(id);
+ }
+
+ ///
+ /// Gets package versions.
+ ///
+ /// The ID of the package reference.
+ /// A token to cancel the current operation.
+ [HttpGet("{id}/versions")]
+ public async Task> GetVersionsAsync(
+ Guid id,
+ CancellationToken cancellationToken)
+ {
+ var packageReferenceMap = await _packageService.GetAllAsync();
+
+ if (!packageReferenceMap.TryGetValue(id, out var packageReference))
+ return NotFound($"Unable to find package reference with ID {id}.");
+
+ var result = await _extensionHive
+ .GetVersionsAsync(packageReference, cancellationToken);
+
+ return result;
+ }
+}
diff --git a/src/Nexus.Agent/Core/AgentService.cs b/src/Nexus.Agent/Core/AgentService.cs
new file mode 100644
index 0000000..6f003d5
--- /dev/null
+++ b/src/Nexus.Agent/Core/AgentService.cs
@@ -0,0 +1,178 @@
+using System.Collections.Concurrent;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using Nexus.Extensibility;
+using Nexus.PackageManagement.Services;
+using Nexus.Remoting;
+
+namespace Nexus.Agent;
+
+public class TcpClientPair
+{
+ public NetworkStream? Comm { get; set; }
+
+ public NetworkStream? Data { get; set; }
+
+ public RemoteCommunicator? RemoteCommunicator { get; set; }
+}
+
+internal class AgentService
+{
+ private readonly Lock _lock = new();
+
+ private readonly ConcurrentDictionary _tcpClientPairs = new();
+
+ private readonly IExtensionHive _extensionHive;
+
+ private readonly IPackageService _packageService;
+
+ private readonly ILogger _agentLogger;
+
+ public AgentService(
+ IExtensionHive extensionHive,
+ IPackageService packageService,
+ ILogger agentLogger)
+ {
+ _extensionHive = extensionHive;
+ _packageService = packageService;
+ _agentLogger = agentLogger;
+ }
+
+ public async Task LoadPackagesAsync(CancellationToken cancellationToken)
+ {
+ _agentLogger.LogInformation("Load packages");
+
+ var packageReferenceMap = await _packageService.GetAllAsync();
+ var progress = new Progress();
+
+ await _extensionHive.LoadPackagesAsync(
+ packageReferenceMap: packageReferenceMap,
+ progress,
+ cancellationToken
+ );
+ }
+
+ public Task AcceptClientsAsync(CancellationToken cancellationToken)
+ {
+ var tcpListener = new TcpListener(IPAddress.Any, 56145);
+ tcpListener.Start();
+
+ return Task.Run(async () =>
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var client = await tcpListener.AcceptTcpClientAsync(cancellationToken);
+
+ _ = Task.Run(async () =>
+ {
+ if (!client.Connected)
+ throw new Exception("client is not connected");
+
+ var streamReadCts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+ var networkStream = client.GetStream(); /* no using because it will close the TCP client */
+
+ // get connection id
+ var buffer1 = new byte[36];
+ await networkStream.ReadExactlyAsync(buffer1, streamReadCts.Token);
+ var idString = Encoding.UTF8.GetString(buffer1);
+
+ // get connection type
+ var buffer2 = new byte[4];
+ await networkStream.ReadExactlyAsync(buffer2, streamReadCts.Token);
+ var typeString = Encoding.UTF8.GetString(buffer2);
+
+ if (Guid.TryParse(Encoding.UTF8.GetString(buffer1), out var id))
+ {
+ _agentLogger.LogDebug("Accept TCP client with connection ID {ConnectionId} and communication type {CommunicationType}", idString, typeString);
+
+ var tcpPairCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+
+ // Handle the timeout event
+ tcpPairCts.Token.Register(() =>
+ {
+ // If TCP client pair can be found ...
+ if (_tcpClientPairs.TryGetValue(id, out var pair))
+ {
+ // and if TCP client pair is not yet complete ...
+ if (pair.Comm is null || pair.Data is null)
+ {
+ // then dispose and remove the clients and the pair
+ pair.Comm?.Dispose();
+ pair.Data?.Dispose();
+
+ _tcpClientPairs.Remove(id, out _);
+ }
+ }
+ });
+
+ // We got a "comm" tcp connection
+ if (typeString == "comm")
+ {
+ _tcpClientPairs.AddOrUpdate(
+ id,
+ addValueFactory: id => new TcpClientPair { Comm = networkStream },
+ updateValueFactory: (id, pair) =>
+ {
+ pair.Comm?.Dispose();
+ pair.Comm = networkStream;
+ return pair;
+ }
+ );
+ }
+
+ // We got a "data" tcp connection
+ else if (typeString == "data")
+ {
+ _tcpClientPairs.AddOrUpdate(
+ id,
+ addValueFactory: id => new TcpClientPair { Data = networkStream },
+ updateValueFactory: (id, pair) =>
+ {
+ pair.Data?.Dispose();
+ pair.Data = networkStream;
+ return pair;
+ }
+ );
+ }
+
+ // Something went wrong, dispose the network stream and return
+ else
+ {
+ networkStream.Dispose();
+ return;
+ }
+
+ var pair = _tcpClientPairs[id];
+
+ lock (_lock)
+ {
+ if (pair.Comm is not null && pair.Data is not null && pair.RemoteCommunicator is null)
+ {
+ _agentLogger.LogDebug("Accept remoting client with connection ID {ConnectionId}", id);
+
+ try
+ {
+ pair.RemoteCommunicator = new RemoteCommunicator(
+ pair.Comm,
+ pair.Data,
+ getDataSource: type => _extensionHive.GetInstance(type)
+ );
+
+ _ = pair.RemoteCommunicator.RunAsync();
+ }
+ catch
+ {
+ pair.Comm?.Dispose();
+ pair.Data?.Dispose();
+
+ throw;
+ }
+ }
+ }
+ }
+ });
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/Nexus.Agent/Core/InternalControllerFeatureProvider.cs b/src/Nexus.Agent/Core/InternalControllerFeatureProvider.cs
new file mode 100644
index 0000000..2bc8aa8
--- /dev/null
+++ b/src/Nexus.Agent/Core/InternalControllerFeatureProvider.cs
@@ -0,0 +1,49 @@
+// MIT License
+// Copyright (c) [2024] [nexus-main]
+
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.AspNetCore.Mvc.ApplicationParts;
+using Microsoft.AspNetCore.Mvc.Controllers;
+using System.Reflection;
+
+namespace Nexus.PackageManagement.Core;
+
+internal class InternalControllerFeatureProvider : IApplicationFeatureProvider
+{
+ private const string ControllerTypeNameSuffix = "Controller";
+
+ public void PopulateFeature(IEnumerable parts, ControllerFeature feature)
+ {
+ foreach (var part in parts.OfType())
+ {
+ foreach (var type in part.Types)
+ {
+ if (IsController(type) && !feature.Controllers.Contains(type))
+ {
+ feature.Controllers.Add(type);
+ }
+ }
+ }
+ }
+
+ protected virtual bool IsController(TypeInfo typeInfo)
+ {
+ if (!typeInfo.IsClass)
+ return false;
+
+ if (typeInfo.IsAbstract)
+ return false;
+
+ if (typeInfo.ContainsGenericParameters)
+ return false;
+
+ if (typeInfo.IsDefined(typeof(NonControllerAttribute)))
+ return false;
+
+ if (!typeInfo.Name.EndsWith(ControllerTypeNameSuffix, StringComparison.OrdinalIgnoreCase) &&
+ !typeInfo.IsDefined(typeof(ControllerAttribute)))
+ return false;
+
+ return true;
+ }
+}
diff --git a/src/Nexus.Agent/Core/Options.cs b/src/Nexus.Agent/Core/Options.cs
new file mode 100644
index 0000000..e75b624
--- /dev/null
+++ b/src/Nexus.Agent/Core/Options.cs
@@ -0,0 +1,44 @@
+using System.Runtime.InteropServices;
+
+namespace Nexus.Core;
+
+internal abstract record NexusAgentOptions()
+{
+ // for testing only
+ public string? BlindSample { get; set; }
+
+ internal static IConfiguration BuildConfiguration()
+ {
+ var environmentName = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
+
+ var builder = new ConfigurationBuilder()
+ .AddJsonFile("appsettings.json");
+
+ if (!string.IsNullOrWhiteSpace(environmentName))
+ {
+ builder
+ .AddJsonFile($"appsettings.{environmentName}.json", optional: true, reloadOnChange: true);
+ }
+
+ builder.AddEnvironmentVariables(prefix: "NEXUSAGENT_");
+
+ return builder.Build();
+ }
+}
+
+internal record PathsOptions : IPackageManagementPathsOptions
+{
+ public const string Section = "Paths";
+
+ public string Config { get; set; } = Path.Combine(PlatformSpecificRoot, "config");
+
+ public string Packages { get; set; } = Path.Combine(PlatformSpecificRoot, "packages");
+
+ #region Support
+
+ private static string PlatformSpecificRoot { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
+ ? Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), "nexus-agent")
+ : Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile), ".local", "share", "nexus-agent");
+
+ #endregion
+}
\ No newline at end of file
diff --git a/src/Nexus.Agent/Dockerfile b/src/Nexus.Agent/Dockerfile
new file mode 100644
index 0000000..c42f6d0
--- /dev/null
+++ b/src/Nexus.Agent/Dockerfile
@@ -0,0 +1,6 @@
+FROM mcr.microsoft.com/dotnet/sdk:9.0
+WORKDIR /app
+COPY app .
+
+USER app
+ENTRYPOINT ["./Nexus.Agent"]
\ No newline at end of file
diff --git a/src/Nexus.Agent/Nexus.Agent.csproj b/src/Nexus.Agent/Nexus.Agent.csproj
new file mode 100644
index 0000000..a0b15d2
--- /dev/null
+++ b/src/Nexus.Agent/Nexus.Agent.csproj
@@ -0,0 +1,21 @@
+
+
+
+ $(TargetFrameworkVersion)
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Nexus.Agent/Program.cs b/src/Nexus.Agent/Program.cs
new file mode 100644
index 0000000..0a6ab05
--- /dev/null
+++ b/src/Nexus.Agent/Program.cs
@@ -0,0 +1,72 @@
+// TODO
+// - cancellation (RemoteCommunicator.RunAsync)
+// - client logout / timeout
+// - listen to localhost by default, make it configurable
+// - "src/Nexus.Agent" -> "src/agent/dotnet-agent/..."?
+// - Podman container tests (or remove container and just provide build script = Dockerfile?)
+// - use nginx basic auth to protect page? Or using a token provided by ENV variable?
+
+using Asp.Versioning;
+using Microsoft.Extensions.Options;
+using Nexus.Agent;
+using Nexus.Core;
+using Nexus.PackageManagement.Core;
+using Scalar.AspNetCore;
+
+var builder = WebApplication.CreateBuilder();
+
+var configuration = NexusAgentOptions.BuildConfiguration();
+builder.Configuration.AddConfiguration(configuration);
+
+builder.Services
+
+ .AddOpenApi()
+ // .AddOpenApi("v2")
+
+ .AddApiVersioning(config =>
+ {
+ config.ReportApiVersions = true;
+ config.ApiVersionReader = new UrlSegmentApiVersionReader();
+ })
+
+ .AddApiExplorer(config =>
+ {
+ config.GroupNameFormat = "'v'VVV";
+ config.SubstituteApiVersionInUrl = true;
+ });
+
+builder.Services
+ .AddControllers()
+ .ConfigureApplicationPartManager(
+ manager => manager.FeatureProviders.Add(new InternalControllerFeatureProvider())
+ );
+
+builder.Services
+ .AddSingleton();
+
+builder.Services.Configure(configuration.GetSection(PathsOptions.Section));
+
+// Package management
+builder.Services.AddPackageManagement();
+
+builder.Services.AddSingleton(
+ serviceProvider => serviceProvider.GetRequiredService>().Value);
+
+var app = builder.Build();
+
+app.MapGet("/", () => Results.Redirect("/scalar/v1")).ExcludeFromDescription();
+app.MapOpenApi();
+app.MapScalarApiReference();
+app.MapControllers();
+
+var pathsOptions = app.Services.GetRequiredService>();
+
+var logger = app.Services.GetRequiredService>();
+logger.LogInformation("Current directory: {CurrentDirectory}", Environment.CurrentDirectory);
+logger.LogInformation("Loading configuration from path: {ConfigFolderPath}", pathsOptions.Value.Config);
+
+var agent = app.Services.GetRequiredService();
+await agent.LoadPackagesAsync(CancellationToken.None);
+_ = agent.AcceptClientsAsync(CancellationToken.None);
+
+app.Run();
diff --git a/src/Nexus.Agent/Properties/launchSettings.json b/src/Nexus.Agent/Properties/launchSettings.json
new file mode 100644
index 0000000..24838d3
--- /dev/null
+++ b/src/Nexus.Agent/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "https://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "http": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "applicationUrl": "http://localhost:5000",
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development"
+ }
+ }
+ }
+}
diff --git a/src/Nexus.Agent/appsettings.Development.json b/src/Nexus.Agent/appsettings.Development.json
new file mode 100644
index 0000000..ff66ba6
--- /dev/null
+++ b/src/Nexus.Agent/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning"
+ }
+ }
+}
diff --git a/src/Nexus.Agent/appsettings.json b/src/Nexus.Agent/appsettings.json
new file mode 100644
index 0000000..47820ef
--- /dev/null
+++ b/src/Nexus.Agent/appsettings.json
@@ -0,0 +1,13 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning",
+ "Nexus": "Information"
+ }
+ },
+ "AllowedHosts": "*",
+ "Paths": {
+ "BlindSample": "Paths"
+ }
+}
diff --git a/src/Nexus.Sources.Remote/DataSourceTypes.cs b/src/Nexus.Sources.Remote/DataSourceTypes.cs
index 51a7a8a..57eb2d4 100644
--- a/src/Nexus.Sources.Remote/DataSourceTypes.cs
+++ b/src/Nexus.Sources.Remote/DataSourceTypes.cs
@@ -12,13 +12,13 @@ public Task
GetApiVersionAsync(CancellationToken cancellationToken);
public Task
- SetContextAsync(DataSourceContext context, CancellationToken cancellationToken);
+ SetContextAsync(string type, DataSourceContext context, CancellationToken cancellationToken);
public Task
GetCatalogRegistrationsAsync(string path, CancellationToken cancellationToken);
public Task
- GetCatalogAsync(string catalogId, CancellationToken cancellationToken);
+ EnrichCatalogAsync(ResourceCatalog catalog, CancellationToken cancellationToken);
public Task
GetTimeRangeAsync(string catalogId, CancellationToken cancellationToken);
@@ -27,7 +27,7 @@ public Task
GetAvailabilityAsync(string catalogId, DateTime begin, DateTime end, CancellationToken cancellationToken);
public Task
- ReadSingleAsync(DateTime begin, DateTime end, CatalogItem catalogItem, CancellationToken cancellationToken);
+ ReadSingleAsync(DateTime begin, DateTime end, string originalResourceName, CatalogItem catalogItem, CancellationToken cancellationToken);
}
internal record ApiVersionResponse(int ApiVersion);
@@ -54,7 +54,12 @@ public override bool CanConvert(Type objectType)
return canConvert;
}
- public override object? ReadJson(Newtonsoft.Json.JsonReader reader, Type objectType, object? existingValue, Newtonsoft.Json.JsonSerializer serializer)
+ public override object? ReadJson(
+ Newtonsoft.Json.JsonReader reader,
+ Type objectType,
+ object? existingValue,
+ Newtonsoft.Json.JsonSerializer serializer
+ )
{
if (reader.TokenType == Newtonsoft.Json.JsonToken.Null)
return default;
diff --git a/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj b/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj
index f3b3844..3387817 100644
--- a/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj
+++ b/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj
@@ -11,11 +11,11 @@
-
+
runtime;native
-
+
diff --git a/src/Nexus.Sources.Remote/Remote.cs b/src/Nexus.Sources.Remote/Remote.cs
index 9ea341f..527678f 100644
--- a/src/Nexus.Sources.Remote/Remote.cs
+++ b/src/Nexus.Sources.Remote/Remote.cs
@@ -12,20 +12,16 @@ namespace Nexus.Sources;
[ExtensionDescription(
"Provides access to remote databases",
"https://github.com/nexus-main/nexus-sources-remote",
- "https://github.com/nexus-main/nexus-sources-remote")]
+ "https://github.com/nexus-main/nexus-sources-remote")]
public partial class Remote : IDataSource, IDisposable
{
- #region Fields
+ private const int DEFAULT_AGENT_PORT = 56145;
private ReadDataHandler? _readData;
private static readonly int API_LEVEL = 1;
private RemoteCommunicator _communicator = default!;
private IJsonRpcServer _rpcServer = default!;
- #endregion
-
- #region Properties
-
/* Possible features to be implemented for this data source:
*
* Transports:
@@ -45,10 +41,6 @@ public partial class Remote : IDataSource, IDisposable
private DataSourceContext Context { get; set; } = default!;
- #endregion
-
- #region Methods
-
public async Task SetContextAsync(
DataSourceContext context,
ILogger logger,
@@ -56,61 +48,48 @@ public async Task SetContextAsync(
{
Context = context;
- // mode
- var mode = Context.SourceConfiguration?.GetStringValue("mode") ?? "tcp";
+ // Endpoint
+ if (context.ResourceLocator is null || context.ResourceLocator.Scheme != "tcp")
+ throw new ArgumentException("The resource locator parameter URI must be set with the 'tcp' scheme.");
- if (mode != "tcp")
- throw new NotSupportedException($"The mode {mode} is not supported.");
+ var endpointString = context.ResourceLocator.ToString().Replace("tcp://", "").Replace("/", "");
+ var ipAddress = default(IPAddress);
- // listen-address
- var listenAddressString = Context.SourceConfiguration?.GetStringValue("listen-address") ?? "0.0.0.0";
+ if (!IPEndPoint.TryParse(endpointString, out var endpoint) && !IPAddress.TryParse(endpointString, out ipAddress))
+ throw new ArgumentException("The resource locator parameter is not a valid IP endpoint.");
- if (!IPAddress.TryParse(listenAddressString, out var listenAddress))
- throw new ArgumentException("The listen-address parameter is not a valid IP-Address.");
+ if (endpoint is null)
+ endpoint = new IPEndPoint(ipAddress!, DEFAULT_AGENT_PORT);
- // listen-port
- var listenPortMin = Context.SourceConfiguration?.GetIntValue("listen-port-min") ?? 49152;
+ // Type
+ var type = Context.SourceConfiguration?.GetStringValue("type") ?? throw new Exception("The data source type is missing.");
- if (!(1 <= listenPortMin && listenPortMin < 65536))
- throw new ArgumentException("The listen-port-min parameter is invalid.");
+ // Resource locator
+ var resourceLocatorString = Context.SourceConfiguration?.GetStringValue("resourceLocator");
- var listenPortMax = Context.SourceConfiguration?.GetIntValue("listen-port-max") ?? 65536;
+ if (!Uri.TryCreate(resourceLocatorString, UriKind.Absolute, out var resourceLocator))
+ throw new ArgumentException("The resource locator parameter is not a valid URI.");
- if (!(1 <= listenPortMin && listenPortMin < 65536))
- throw new ArgumentException("The listen-port-max parameter is invalid.");
+ // Source configuration
+ var sourceConfigurationJsonElement = Context.SourceConfiguration?.GetValueOrDefault("sourceConfiguration");
- // template
- var templateId = (Context.SourceConfiguration?.GetStringValue("template")) ?? throw new KeyNotFoundException("The template parameter must be provided.");
+ var sourceConfiguration = sourceConfigurationJsonElement.HasValue && sourceConfigurationJsonElement.Value.ValueKind == JsonValueKind.Object
- // environment variables
- var requestConfiguration = Context.SourceConfiguration!;
- var environmentVariables = new Dictionary();
+ ? JsonSerializer
+ .Deserialize>(sourceConfigurationJsonElement.Value)
- if (requestConfiguration.TryGetValue("environment-variables", out var propertyValue) &&
- propertyValue.ValueKind == JsonValueKind.Object)
- {
- var environmentVariablesRaw = propertyValue.Deserialize>();
-
- if (environmentVariablesRaw is not null)
- environmentVariables = environmentVariablesRaw
- .Where(entry => entry.Value.ValueKind == JsonValueKind.String)
- .ToDictionary(entry => entry.Key, entry => entry.Value.GetString() ?? "");
- }
-
- // Build command
- var actualCommand = BuildCommand(templateId);
-
- //
- var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
+ : JsonSerializer
+ .Deserialize>("{}");
+ // Remote communicator
_communicator = new RemoteCommunicator(
- actualCommand,
- environmentVariables,
- listenAddress,
- listenPortMin,
- listenPortMax,
+ endpoint,
HandleReadDataAsync,
- logger);
+ logger
+ );
+
+ var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
+ cancellationToken.Register(timeoutTokenSource.Cancel);
_rpcServer = await _communicator.ConnectAsync(timeoutTokenSource.Token);
@@ -119,12 +98,18 @@ public async Task SetContextAsync(
if (apiVersion < 1 || apiVersion > API_LEVEL)
throw new Exception($"The API level '{apiVersion}' is not supported.");
+ // Set context
logger.LogTrace("Set context to remote client");
- await _rpcServer
- .SetContextAsync(context, timeoutTokenSource.Token);
+ var subContext = new DataSourceContext(
+ resourceLocator,
+ context.SystemConfiguration,
+ sourceConfiguration,
+ context.RequestConfiguration
+ );
- logger.LogDebug("Done preparing remote client");
+ await _rpcServer
+ .SetContextAsync(type, subContext, timeoutTokenSource.Token);
}
public async Task GetCatalogRegistrationsAsync(
@@ -132,7 +117,7 @@ public async Task GetCatalogRegistrationsAsync(
CancellationToken cancellationToken)
{
var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
- cancellationToken.Register(() => timeoutTokenSource.Cancel());
+ cancellationToken.Register(timeoutTokenSource.Cancel);
var response = await _rpcServer
.GetCatalogRegistrationsAsync(path, timeoutTokenSource.Token);
@@ -140,15 +125,15 @@ public async Task GetCatalogRegistrationsAsync(
return response.Registrations;
}
- public async Task GetCatalogAsync(
- string catalogId,
+ public async Task EnrichCatalogAsync(
+ ResourceCatalog catalog,
CancellationToken cancellationToken)
{
var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
- cancellationToken.Register(() => timeoutTokenSource.Cancel());
+ cancellationToken.Register(timeoutTokenSource.Cancel);
var response = await _rpcServer
- .GetCatalogAsync(catalogId, timeoutTokenSource.Token);
+ .EnrichCatalogAsync(catalog, timeoutTokenSource.Token);
return response.Catalog;
}
@@ -158,7 +143,7 @@ public async Task GetCatalogAsync(
CancellationToken cancellationToken)
{
var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
- cancellationToken.Register(() => timeoutTokenSource.Cancel());
+ cancellationToken.Register(timeoutTokenSource.Cancel);
var response = await _rpcServer
.GetTimeRangeAsync(catalogId, timeoutTokenSource.Token);
@@ -176,7 +161,7 @@ public async Task GetAvailabilityAsync(
CancellationToken cancellationToken)
{
var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
- cancellationToken.Register(() => timeoutTokenSource.Cancel());
+ cancellationToken.Register(timeoutTokenSource.Cancel);
var response = await _rpcServer
.GetAvailabilityAsync(catalogId, begin, end, timeoutTokenSource.Token);
@@ -198,17 +183,17 @@ public async Task ReadAsync(
{
var counter = 0.0;
- foreach (var (catalogItem, data, status) in requests)
+ foreach (var (originalResourceName, catalogItem, data, status) in requests)
{
cancellationToken.ThrowIfCancellationRequested();
var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
- cancellationToken.Register(() => timeoutTokenSource.Cancel());
+ cancellationToken.Register(timeoutTokenSource.Cancel);
var elementCount = data.Length / catalogItem.Representation.ElementSize;
await _rpcServer
- .ReadSingleAsync(begin, end, catalogItem, timeoutTokenSource.Token);
+ .ReadSingleAsync(begin, end, originalResourceName, catalogItem, timeoutTokenSource.Token);
await _communicator.ReadRawAsync(data, timeoutTokenSource.Token);
await _communicator.ReadRawAsync(status, timeoutTokenSource.Token);
@@ -222,20 +207,6 @@ await _rpcServer
}
}
- private string BuildCommand(string templateId)
- {
- var template = (Context.SystemConfiguration?
- .GetStringValue($"{typeof(Remote).FullName}/templates/{templateId}")) ?? throw new Exception($"The template {templateId} does not exist.");
- var command = CommandRegex().Replace(template, match =>
- {
- var parameterKey = match.Groups[1].Value;
- var parameterValue = (Context.SourceConfiguration?.GetStringValue(parameterKey)) ?? throw new Exception($"The {parameterKey} parameter must be provided.");
- return parameterValue;
- });
-
- return command;
- }
-
// copy from Nexus -> DataModelUtilities
private static readonly Regex _resourcePathEvaluator = MyRegex();
@@ -256,9 +227,9 @@ private async Task HandleReadDataAsync(string resourcePath, DateTime begin, Date
if (!match.Success)
throw new Exception("Invalid resource path");
- var samplePeriod = (TimeSpan)_toSamplePeriodMethodInfo.Invoke(null, new object[] {
+ var samplePeriod = (TimeSpan)_toSamplePeriodMethodInfo.Invoke(null, [
match.Groups["sample_period"].Value
- })!;
+ ])!;
// find buffer length and rent buffer
var length = (int)((end - begin).Ticks / samplePeriod.Ticks);
@@ -282,8 +253,6 @@ private static CancellationTokenSource GetTimeoutTokenSource(TimeSpan timeout)
return timeoutToken;
}
- #endregion
-
#region IDisposable
private bool _disposedValue;
diff --git a/src/Nexus.Sources.Remote/RemoteCommunicator.cs b/src/Nexus.Sources.Remote/RemoteCommunicator.cs
index af146e5..7e7d97f 100644
--- a/src/Nexus.Sources.Remote/RemoteCommunicator.cs
+++ b/src/Nexus.Sources.Remote/RemoteCommunicator.cs
@@ -1,9 +1,6 @@
-using System.Diagnostics;
-using System.Net;
-using System.Net.NetworkInformation;
+using System.Net;
using System.Net.Sockets;
using System.Text;
-using System.Text.RegularExpressions;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
@@ -11,204 +8,111 @@
namespace Nexus.Sources;
-internal partial class RemoteCommunicator
+internal class RemoteCommunicator
{
- #region Fields
+ private readonly IPEndPoint _endpoint;
- private static readonly object _lock = new();
- private static int _nextMin = -1;
- private readonly TcpListener _tcpListener;
- private Stream _commStream = default!;
- private Stream _dataStream = default!;
- private IJsonRpcServer _rpcServer = default!;
+ private readonly TcpClient _comm = new();
- private readonly ILogger _logger;
- private readonly Func _readData;
+ private readonly TcpClient _data = new();
- private readonly string _command;
- private readonly string _arguments;
- private readonly Dictionary _environmentVariables;
+ private NetworkStream? _commStream;
- private Process _process = default!;
+ private NetworkStream? _dataStream;
- #endregion
+ private IJsonRpcServer _rpcServer = default!;
- #region Constructors
+ private readonly ILogger _logger;
+
+ private readonly Func _readData;
public RemoteCommunicator(
- string command,
- Dictionary environmentVariables,
- IPAddress listenAddress,
- int listenPortMin,
- int listenPortMax,
+ IPEndPoint endpoint,
Func readData,
- ILogger logger)
+ ILogger logger
+ )
{
- _environmentVariables = environmentVariables;
+ _endpoint = endpoint;
_readData = readData;
_logger = logger;
-
- var listenPort = GetNextUnusedPort(listenPortMin, listenPortMax);
-
- command = CommandRegex().Replace(command, listenPort.ToString());
- var commandParts = command.Split(" ", count: 2);
- _command = commandParts[0];
-
- _arguments = commandParts.Length == 2
- ? commandParts[1]
- : "";
-
- _tcpListener = new TcpListener(listenAddress, listenPort);
- _tcpListener.Start();
}
- #endregion
-
- #region Methods
-
public async Task ConnectAsync(CancellationToken cancellationToken)
{
- try
- {
- cancellationToken.Register(_tcpListener.Stop);
-
- // start process
- _logger.LogDebug("Start process.");
-
- var psi = new ProcessStartInfo(_command)
- {
- Arguments = _arguments,
- };
-
- foreach (var variable in _environmentVariables)
- {
- psi.EnvironmentVariables[variable.Key] = variable.Value;
- }
-
- psi.RedirectStandardOutput = true;
- psi.RedirectStandardError = true;
+ var id = Guid.NewGuid().ToString();
- _process = new Process() { StartInfo = psi };
- _process.Start();
+ // comm connection
+ await _comm.ConnectAsync(_endpoint, cancellationToken);
+ _commStream = _comm.GetStream();
- _process.OutputDataReceived += (sender, e) =>
- {
- if (!string.IsNullOrWhiteSpace(e.Data))
- _logger.LogDebug("{Message}", e.Data);
- };
-
- _process.ErrorDataReceived += (sender, e) =>
- {
- if (!string.IsNullOrWhiteSpace(e.Data))
- _logger.LogWarning("{Message}", e.Data);
- };
-
- _process.BeginOutputReadLine();
- _process.BeginErrorReadLine();
-
- // wait for clients to connect
- _logger.LogDebug("Wait for clients to connect.");
-
- var filters = new string[] { "comm", "data" };
+ await _commStream.WriteAsync(Encoding.UTF8.GetBytes(id), cancellationToken);
+ await _commStream.WriteAsync(Encoding.UTF8.GetBytes("comm"), cancellationToken);
+ await _commStream.FlushAsync(cancellationToken);
- Stream? commStream = default;
- Stream? dataStream = default;
-
- for (int i = 0; i < 2; i++)
- {
- var (identifier, client) = await GetTcpClientAsync(filters, cancellationToken);
-
- if (commStream is null && identifier == "comm")
- commStream = client.GetStream();
-
- else if (dataStream is null && identifier == "data")
- dataStream = client.GetStream();
- }
+ // data connection
+ await _data.ConnectAsync(_endpoint, cancellationToken);
+ _dataStream = _data.GetStream();
+
+ await _dataStream.WriteAsync(Encoding.UTF8.GetBytes(id), cancellationToken);
+ await _dataStream.WriteAsync(Encoding.UTF8.GetBytes("data"), cancellationToken);
+ await _dataStream.FlushAsync(cancellationToken);
- if (commStream is null || dataStream is null)
- throw new Exception("The RPC server did not connect properly via communication and a data stream. This may indicate that other TCP clients have tried to connect.");
-
- _commStream = commStream;
- _dataStream = dataStream;
-
- var formatter = new JsonMessageFormatter()
- {
- JsonSerializer = {
- ContractResolver = new DefaultContractResolver
- {
- NamingStrategy = new CamelCaseNamingStrategy()
- }
+ var formatter = new JsonMessageFormatter()
+ {
+ JsonSerializer = {
+ ContractResolver = new DefaultContractResolver
+ {
+ NamingStrategy = new CamelCaseNamingStrategy()
}
- };
+ }
+ };
- formatter.JsonSerializer.Converters.Add(new JsonElementConverter());
- formatter.JsonSerializer.Converters.Add(new StringEnumConverter());
+ formatter.JsonSerializer.Converters.Add(new JsonElementConverter());
+ formatter.JsonSerializer.Converters.Add(new StringEnumConverter());
- var messageHandler = new LengthHeaderMessageHandler(commStream, commStream, formatter);
- var jsonRpc = new JsonRpc(messageHandler);
+ var messageHandler = new LengthHeaderMessageHandler(_commStream, _commStream, formatter);
+ var jsonRpc = new JsonRpc(messageHandler);
- jsonRpc.AddLocalRpcMethod("log", new Action((logLevel, message) =>
- {
- _logger.Log(logLevel, "{Message}", message);
- }));
+ jsonRpc.AddLocalRpcMethod("log", new Action((logLevel, message) =>
+ {
+ _logger.Log(logLevel, "{Message}", message);
+ }));
- jsonRpc.AddLocalRpcMethod("readData", _readData);
- jsonRpc.StartListening();
+ jsonRpc.AddLocalRpcMethod("readData", _readData);
+ jsonRpc.StartListening();
- _rpcServer = jsonRpc.Attach(new JsonRpcProxyOptions()
- {
- MethodNameTransform = pascalCaseAsyncName =>
- {
- return char.ToLower(pascalCaseAsyncName[0]) + pascalCaseAsyncName[1..].Replace("Async", string.Empty);
- }
- });
-
- return _rpcServer;
- }
- catch
+ _rpcServer = jsonRpc.Attach(new JsonRpcProxyOptions()
{
- try
+ MethodNameTransform = pascalCaseAsyncName =>
{
- _process?.Kill();
- }
- catch
- {
- //
+ return char.ToLower(pascalCaseAsyncName[0]) + pascalCaseAsyncName[1..].Replace("Async", string.Empty);
}
+ });
- throw;
- }
- finally
- {
- _tcpListener.Stop();
- }
+ return _rpcServer;
}
- public Task ReadRawAsync(Memory buffer, CancellationToken cancellationToken)
+ public ValueTask ReadRawAsync(Memory buffer, CancellationToken cancellationToken)
{
- return InternalReadRawAsync(buffer, _dataStream, cancellationToken);
- }
+ if (_dataStream is null)
+ throw new Exception("You need to connect before read any data");
- private static async Task InternalReadRawAsync(Memory buffer, Stream source, CancellationToken cancellationToken)
- {
- while (buffer.Length > 0)
- {
- cancellationToken.ThrowIfCancellationRequested();
- var readCount = await source.ReadAsync(buffer, cancellationToken);
-
- if (readCount == 0)
- throw new Exception("The TCP connection closed early.");
-
- buffer = buffer[readCount..];
- }
+ return _dataStream.ReadExactlyAsync(buffer, cancellationToken);
}
public Task WriteRawAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken)
{
+ if (_dataStream is null)
+ throw new Exception("You need to connect before write any data");
+
return InternalWriteRawAsync(buffer, _dataStream, cancellationToken);
}
- private static async Task InternalWriteRawAsync(ReadOnlyMemory buffer, Stream target, CancellationToken cancellationToken)
+ private static async Task InternalWriteRawAsync(
+ ReadOnlyMemory buffer,
+ Stream target,
+ CancellationToken cancellationToken
+ )
{
var length = BitConverter.GetBytes(buffer.Length).Reverse().ToArray();
@@ -217,58 +121,7 @@ private static async Task InternalWriteRawAsync(ReadOnlyMemory buffer, Str
await target.FlushAsync(cancellationToken);
}
- private static int GetNextUnusedPort(int min, int max)
- {
- lock (_lock)
- {
- min = Math.Max(_nextMin, min);
-
- if (max <= min)
- throw new ArgumentException("Max port cannot be less than or equal to min.");
-
- var ipProperties = IPGlobalProperties.GetIPGlobalProperties();
-
- var usedPorts =
- ipProperties.GetActiveTcpConnections()
- .Where(connection => connection.State != TcpState.Closed)
- .Select(connection => connection.LocalEndPoint)
- .Concat(ipProperties.GetActiveTcpListeners())
- .Select(endpoint => endpoint.Port)
- .ToArray();
-
- var firstUnused =
- Enumerable.Range(min, max - min)
- .Where(port => !usedPorts.Contains(port))
- .Select(port => new int?(port))
- .FirstOrDefault();
-
- if (!firstUnused.HasValue)
- throw new Exception($"All TCP ports in the range of {min}..{max} are currently in use.");
-
- _nextMin = (firstUnused.Value + 1) % max;
- return firstUnused.Value;
- }
- }
-
- private async Task<(string Identifier, TcpClient Client)> GetTcpClientAsync(string[] filters, CancellationToken cancellationToken)
- {
- var buffer = new byte[4];
- var client = await _tcpListener.AcceptTcpClientAsync(cancellationToken);
-
- await InternalReadRawAsync(buffer, client.GetStream(), cancellationToken);
-
- foreach (var filter in filters)
- {
- if (buffer.SequenceEqual(Encoding.UTF8.GetBytes(filter)))
- return (filter, client);
- }
-
- throw new Exception("Invalid stream identifier received.");
- }
-
- #endregion
-
- #region IDisposable
+#region IDisposable
private bool _disposedValue;
@@ -278,29 +131,11 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
- try
- {
- var disposable = _rpcServer as IDisposable;
- disposable?.Dispose();
-
- _commStream?.Dispose();
- _dataStream?.Dispose();
-
- try
- {
- _process?.Kill();
- }
- catch
- {
- //
- }
- }
- catch (Exception)
- {
- //
- }
+ var disposable = _rpcServer as IDisposable;
+ disposable?.Dispose();
- //_process?.WaitForExitAsync();
+ _commStream?.Dispose();
+ _dataStream?.Dispose();
}
_disposedValue = true;
@@ -312,9 +147,6 @@ public void Dispose()
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
-
- [GeneratedRegex("{remote-port}")]
- private static partial Regex CommandRegex();
-
- #endregion
-}
+
+#endregion
+}
\ No newline at end of file
diff --git a/src/remoting/dotnet-remoting/Remoting.cs b/src/remoting/dotnet-remoting/Remoting.cs
index 0288ee4..76adcfa 100644
--- a/src/remoting/dotnet-remoting/Remoting.cs
+++ b/src/remoting/dotnet-remoting/Remoting.cs
@@ -5,18 +5,17 @@
using System.Globalization;
using System.Net.Sockets;
using System.Runtime.InteropServices;
-using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
namespace Nexus.Remoting;
-internal class Logger(NetworkStream tcpCommSocketStream) : ILogger
+internal class Logger(NetworkStream commStream) : ILogger
{
- private readonly NetworkStream _tcpCommSocketStream = tcpCommSocketStream;
+ private readonly NetworkStream _commStream = commStream;
- public IDisposable BeginScope(TState state)
+ public IDisposable? BeginScope(TState state) where TState : notnull
{
throw new NotImplementedException("Scopes are not supported on this logger.");
}
@@ -26,7 +25,13 @@ public bool IsEnabled(LogLevel logLevel)
return true;
}
- public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter)
+ public void Log(
+ LogLevel logLevel,
+ EventId eventId,
+ TState state,
+ Exception? exception,
+ Func formatter
+ )
{
var notification = new JsonObject()
{
@@ -35,7 +40,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except
["params"] = new JsonArray(logLevel.ToString(), formatter(state, exception))
};
- _ = Utilities.SendToServerAsync(notification, _tcpCommSocketStream);
+ _ = Utilities.SendToServerAsync(notification, _commStream);
}
}
@@ -44,40 +49,39 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except
///
public class RemoteCommunicator
{
- private readonly string _address;
- private readonly int _port;
- private readonly TcpClient _tcpCommSocket;
- private readonly TcpClient _tcpDataSocket;
- private NetworkStream _tcpCommSocketStream = default!;
- private NetworkStream _tcpDataSocketStream = default!;
- private readonly IDataSource _dataSource;
+ private readonly NetworkStream _commStream;
+
+ private readonly NetworkStream _dataStream;
+
+ private readonly Func _getDataSource;
+
private ILogger _logger = default!;
+ private IDataSource? _dataSource = default;
+
///
- /// Initializes a new instance of the RemoteCommunicator.
+ /// Initializes a new instance of the .
///
- /// The data source
- /// The address to connect to
- /// The port to connect to
- public RemoteCommunicator(IDataSource dataSource, string address, int port)
+ /// The network stream for communications.
+ /// The network stream for data.
+ /// A func to get a new data source instance by its type name.
+ public RemoteCommunicator(
+ NetworkStream commStream,
+ NetworkStream dataStream,
+ Func getDataSource
+ )
{
- _address = address;
- _port = port;
-
- _tcpCommSocket = new TcpClient();
- _tcpDataSocket = new TcpClient();
+ _commStream = commStream;
+ _dataStream = dataStream;
- if (!(0 < port && port < 65536))
- throw new Exception($"The port {port} is not a valid port number.");
-
- _dataSource = dataSource;
+ _getDataSource = getDataSource;
}
///
/// Starts the remoting operation.
///
///
- public async Task RunAsync()
+ public Task RunAsync()
{
static JsonElement Read(Span jsonRequest)
{
@@ -85,97 +89,92 @@ static JsonElement Read(Span jsonRequest)
return JsonSerializer.Deserialize(ref reader, Utilities.Options);
}
- // comm connection
- await _tcpCommSocket.ConnectAsync(_address, _port);
- _tcpCommSocketStream = _tcpCommSocket.GetStream();
- await _tcpCommSocketStream.WriteAsync(Encoding.UTF8.GetBytes("comm"));
- await _tcpCommSocketStream.FlushAsync();
-
- // data connection
- await _tcpDataSocket.ConnectAsync(_address, _port);
- _tcpDataSocketStream = _tcpDataSocket.GetStream();
- await _tcpDataSocketStream.WriteAsync(Encoding.UTF8.GetBytes("data"));
- await _tcpDataSocketStream.FlushAsync();
-
- // loop
- while (true)
+ /* Make this method async as early as possible to not block the calling method.
+ * Otherwise new clients cannot connect because the call to ReadSize may block
+ * forever, preventing the Lock to be released.
+ */
+ return Task.Run(async () =>
{
- // https://www.jsonrpc.org/specification
+ // loop
+ while (true)
+ {
+ // https://www.jsonrpc.org/specification
- // get request message
- var size = ReadSize(_tcpCommSocketStream);
+ // get request message
+ var size = ReadSize(_commStream);
- using var memoryOwner = MemoryPool.Shared.Rent(size);
- var messageMemory = memoryOwner.Memory[..size];
+ using var memoryOwner = MemoryPool.Shared.Rent(size);
+ var messageMemory = memoryOwner.Memory[..size];
- _tcpCommSocketStream.ReadExactly(messageMemory.Span, _logger);
- var request = Read(messageMemory.Span);
+ _commStream.InternalReadExactly(messageMemory.Span);
+ var request = Read(messageMemory.Span);
- // process message
- Memory data = default;
- Memory status = default;
- JsonObject? response;
+ // process message
+ Memory data = default;
+ Memory status = default;
+ JsonObject? response;
- if (request.TryGetProperty("jsonrpc", out var element) &&
- element.ValueKind == JsonValueKind.String &&
- element.GetString() == "2.0")
- {
- if (request.TryGetProperty("id", out var _))
+ if (request.TryGetProperty("jsonrpc", out var element) &&
+ element.ValueKind == JsonValueKind.String &&
+ element.GetString() == "2.0")
{
- try
+ if (request.TryGetProperty("id", out var _))
{
- (var result, data, status) = await ProcessInvocationAsync(request);
+ try
+ {
+ (var result, data, status) = await ProcessInvocationAsync(request);
- response = new JsonObject()
+ response = new JsonObject()
+ {
+ ["result"] = result
+ };
+ }
+ catch (Exception ex)
{
- ["result"] = result
- };
+ response = new JsonObject()
+ {
+ ["error"] = new JsonObject()
+ {
+ ["code"] = -1,
+ ["message"] = ex.ToString()
+ }
+ };
+ }
}
- catch (Exception ex)
+ else
{
- response = new JsonObject()
- {
- ["error"] = new JsonObject()
- {
- ["code"] = -1,
- ["message"] = ex.ToString()
- }
- };
+ throw new Exception($"JSON-RPC 2.0 notifications are not supported.");
}
}
else
{
- throw new Exception($"JSON-RPC 2.0 notifications are not supported.");
+ throw new Exception($"JSON-RPC 2.0 message expected, but got something else.");
}
- }
- else
- {
- throw new Exception($"JSON-RPC 2.0 message expected, but got something else.");
- }
- response.Add("jsonrpc", "2.0");
+ response.Add("jsonrpc", "2.0");
- string? id;
+ string? id;
- if (request.TryGetProperty("id", out var element2))
- id = element2.ToString();
+ if (request.TryGetProperty("id", out var element2))
+ id = element2.ToString();
- else
- throw new Exception("Unable to read the request message id.");
+ else
+ throw new Exception("Unable to read the request message id.");
- response.Add("id", id);
+ response.Add("id", id);
- // send response
- await Utilities.SendToServerAsync(response, _tcpCommSocketStream);
+ // send response
+ await Utilities.SendToServerAsync(response, _commStream);
- // send data
- if (!data.Equals(default) && !status.Equals(default))
- {
- await _tcpDataSocketStream.WriteAsync(data);
- await _tcpDataSocketStream.WriteAsync(status);
- await _tcpDataSocketStream.FlushAsync();
+ // send data
+ if (!data.Equals(default) && !status.Equals(default))
+ {
+ await _dataStream.WriteAsync(data);
+ await _dataStream.WriteAsync(status);
+ await _dataStream.FlushAsync();
+ }
}
- }
+ });
}
private async Task<(JsonObject?, Memory, Memory)> ProcessInvocationAsync(JsonElement request)
@@ -199,9 +198,12 @@ static JsonElement Read(Span jsonRequest)
else if (methodName == "setContext")
{
- var rawContext = @params[0];
+ var type = @params[0].ToString();
+ var rawContext = @params[1];
var resourceLocator = default(Uri?);
+ _dataSource = _getDataSource(type);
+
if (rawContext.TryGetProperty("resourceLocator", out var value))
resourceLocator = new Uri(value.GetString()!);
@@ -223,7 +225,7 @@ static JsonElement Read(Span jsonRequest)
if (rawContext.TryGetProperty("requestConfiguration", out var requestConfigurationElement))
requestConfiguration = JsonSerializer.Deserialize?>(requestConfigurationElement);
- _logger = new Logger(_tcpCommSocketStream);
+ _logger = new Logger(_commStream);
var context = new DataSourceContext(
resourceLocator,
@@ -237,6 +239,9 @@ static JsonElement Read(Span jsonRequest)
else if (methodName == "getCatalogRegistrations")
{
+ if (_dataSource is null)
+ throw new Exception("The data source context must be set before invoking other methods.");
+
var path = @params[0].GetString()!;
var registrations = await _dataSource.GetCatalogRegistrationsAsync(path, CancellationToken.None);
@@ -246,10 +251,13 @@ static JsonElement Read(Span jsonRequest)
};
}
- else if (methodName == "getCatalog")
+ else if (methodName == "enrichCatalog")
{
- var catalogId = @params[0].GetString()!;
- var catalog = await _dataSource.GetCatalogAsync(catalogId, CancellationToken.None);
+ if (_dataSource is null)
+ throw new Exception("The data source context must be set before invoking other methods.");
+
+ var originalCatalog = JsonSerializer.Deserialize(@params[0], Utilities.Options)!;
+ var catalog = await _dataSource.EnrichCatalogAsync(originalCatalog, CancellationToken.None);
result = new JsonObject()
{
@@ -259,6 +267,9 @@ static JsonElement Read(Span jsonRequest)
else if (methodName == "getTimeRange")
{
+ if (_dataSource is null)
+ throw new Exception("The data source context must be set before invoking other methods.");
+
var catalogId = @params[0].GetString()!;
var (begin, end) = await _dataSource.GetTimeRangeAsync(catalogId, CancellationToken.None);
@@ -271,6 +282,9 @@ static JsonElement Read(Span jsonRequest)
else if (methodName == "getAvailability")
{
+ if (_dataSource is null)
+ throw new Exception("The data source context must be set before invoking other methods.");
+
var catalogId = @params[0].GetString()!;
var beginString = @params[1].GetString()!;
@@ -289,15 +303,20 @@ static JsonElement Read(Span jsonRequest)
else if (methodName == "readSingle")
{
+ if (_dataSource is null)
+ throw new Exception("The data source context must be set before invoking other methods.");
+
var beginString = @params[0].GetString()!;
var begin = DateTime.ParseExact(beginString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture).ToUniversalTime();
var endString = @params[1].GetString()!;
var end = DateTime.ParseExact(endString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture).ToUniversalTime();
- var catalogItem = JsonSerializer.Deserialize(@params[2], Utilities.Options)!;
+ var originalResourceName = @params[2].GetString()!;
+
+ var catalogItem = JsonSerializer.Deserialize(@params[3], Utilities.Options)!;
(data, status) = ExtensibilityUtilities.CreateBuffers(catalogItem.Representation, begin, end);
- var readRequest = new ReadRequest(catalogItem, data, status);
+ var readRequest = new ReadRequest(originalResourceName, catalogItem, data, status);
await _dataSource.ReadAsync(
begin,
@@ -349,22 +368,22 @@ private async Task HandleReadDataAsync(
_logger.LogDebug("Read resource path {ResourcePath} from Nexus", resourcePath);
- await Utilities.SendToServerAsync(readDataRequest, _tcpCommSocketStream);
+ await Utilities.SendToServerAsync(readDataRequest, _commStream);
- var size = ReadSize(_tcpDataSocketStream);
+ var size = ReadSize(_dataStream);
if (size != buffer.Length * sizeof(double))
throw new Exception("Data returned by Nexus have an unexpected length");
_logger.LogTrace("Try to read {ByteCount} bytes from Nexus", size);
- _tcpDataSocketStream.ReadExactly(MemoryMarshal.AsBytes(buffer.Span), _logger);
+ _dataStream.InternalReadExactly(MemoryMarshal.AsBytes(buffer.Span));
}
private int ReadSize(NetworkStream currentStream)
{
Span sizeBuffer = stackalloc byte[4];
- currentStream.ReadExactly(sizeBuffer, _logger);
+ currentStream.InternalReadExactly(sizeBuffer);
MemoryExtensions.Reverse(sizeBuffer);
var size = BitConverter.ToInt32(sizeBuffer);
@@ -390,7 +409,7 @@ static Utilities()
public static async Task SendToServerAsync(JsonNode response, NetworkStream currentStream)
{
- var encodedResponse = JsonSerializer.SerializeToUtf8Bytes(response, Utilities.Options);
+ var encodedResponse = JsonSerializer.SerializeToUtf8Bytes(response, Options);
var messageLength = BitConverter.GetBytes(encodedResponse.Length);
Array.Reverse(messageLength);
@@ -411,17 +430,14 @@ public static async Task SendToServerAsync(JsonNode response, NetworkStream curr
internal static class StreamExtensions
{
- public static void ReadExactly(this Stream stream, Span buffer, ILogger logger)
+ public static void InternalReadExactly(this Stream stream, Span buffer)
{
while (buffer.Length > 0)
{
var read = stream.Read(buffer);
if (read == 0)
- {
- logger.LogDebug("No data from Nexus received (exiting)");
- Environment.Exit(0);
- }
+ throw new Exception("The stream has been closed");
buffer = buffer[read..];
}
diff --git a/src/remoting/dotnet-remoting/dotnet-remoting.csproj b/src/remoting/dotnet-remoting/dotnet-remoting.csproj
index 702a3cb..2dc0726 100644
--- a/src/remoting/dotnet-remoting/dotnet-remoting.csproj
+++ b/src/remoting/dotnet-remoting/dotnet-remoting.csproj
@@ -26,7 +26,7 @@
-
+
diff --git a/src/remoting/python-remoting/nexus_remoting/_remoting.py b/src/remoting/python-remoting/nexus_remoting/_remoting.py
index cce7431..87a00b3 100644
--- a/src/remoting/python-remoting/nexus_remoting/_remoting.py
+++ b/src/remoting/python-remoting/nexus_remoting/_remoting.py
@@ -149,7 +149,9 @@ async def _process_invocation(self, request: dict[str, Any]) \
elif method_name == "setContext":
- raw_context = params[0]
+ # TODO: make use of the type (see C# implementation)
+ type = params[0]
+ raw_context = params[1]
resource_locator_string = cast(str, raw_context["resourceLocator"]) if "resourceLocator" in raw_context else None
resource_locator = None if resource_locator_string is None else urlparse(resource_locator_string)
@@ -215,9 +217,10 @@ async def _process_invocation(self, request: dict[str, Any]) \
begin = datetime.strptime(params[0], "%Y-%m-%dT%H:%M:%SZ")
end = datetime.strptime(params[1], "%Y-%m-%dT%H:%M:%SZ")
- catalog_item = JsonEncoder.decode(CatalogItem, params[2], _json_encoder_options)
+ original_resource_name = params[2]
+ catalog_item = JsonEncoder.decode(CatalogItem, params[3], _json_encoder_options)
(data, status) = ExtensibilityUtilities.create_buffers(catalog_item.representation, begin, end)
- read_request = ReadRequest(catalog_item, data, status)
+ read_request = ReadRequest(original_resource_name, catalog_item, data, status)
await self._data_source.read(
begin,
diff --git a/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj b/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj
index 390ee5d..446368f 100644
--- a/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj
+++ b/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj
@@ -9,22 +9,22 @@
-
+
-
-
-
-
-
+
+
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
diff --git a/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs b/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs
index 5403e81..57c41fa 100644
--- a/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs
+++ b/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs
@@ -10,27 +10,26 @@
namespace Nexus.Sources.Tests;
-[Trait("TestCategory", "local")]
-public class RemoteTests
+public class RemoteTests(RemoteTestsFixture fixture)
+ : IClassFixture
{
- [Theory]
- [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")]
- [InlineData("python python/remote.py localhost {remote-port}")]
-#if LINUX
- [InlineData("bash bash/remote.sh localhost {remote-port}")]
-#endif
- public async Task ProvidesCatalog(string command)
+ private readonly RemoteTestsFixture _fixture = fixture;
+
+ [Fact]
+ public async Task ProvidesCatalog()
{
- // arrange
+ await _fixture.Initialize;
+
+ // Arrange
var dataSource = new Remote() as IDataSource;
- var context = CreateContext(command);
+ var context = CreateContext();
await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None);
- // act
- var actual = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None);
+ // Act
+ var actual = await dataSource.EnrichCatalogAsync(new ResourceCatalog("/A/B/C"), CancellationToken.None);
- // assert
+ // Assert
var actualProperties1 = actual.Properties;
var actualIds = actual.Resources!.Select(resource => resource.Id).ToList();
var actualUnits = actual.Resources!.Select(resource => resource.Properties?.GetStringValue("unit")).ToList();
@@ -50,16 +49,13 @@ public async Task ProvidesCatalog(string command)
Assert.True(expectedDataTypes.SequenceEqual(actualDataTypes));
}
- [Theory]
- [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")]
- [InlineData("python python/remote.py localhost {remote-port}")]
-#if LINUX
- [InlineData("bash bash/remote.sh localhost {remote-port}")]
-#endif
- public async Task CanProvideTimeRange(string command)
+[Fact]
+ public async Task CanProvideTimeRange()
{
+ await _fixture.Initialize;
+
var dataSource = new Remote() as IDataSource;
- var context = CreateContext(command);
+ var context = CreateContext();
var expectedBegin = new DateTime(2019, 12, 31, 12, 00, 00, DateTimeKind.Utc);
var expectedEnd = new DateTime(2020, 01, 02, 09, 50, 00, DateTimeKind.Utc);
@@ -72,16 +68,13 @@ public async Task CanProvideTimeRange(string command)
Assert.Equal(expectedEnd, end);
}
- [Theory]
- [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")]
- [InlineData("python python/remote.py localhost {remote-port}")]
-#if LINUX
- [InlineData("bash bash/remote.sh localhost {remote-port}")]
-#endif
- public async Task CanProvideAvailability(string command)
+[Fact]
+ public async Task CanProvideAvailability()
{
+ await _fixture.Initialize;
+
var dataSource = new Remote() as IDataSource;
- var context = CreateContext(command);
+ var context = CreateContext();
await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None);
@@ -92,20 +85,20 @@ public async Task CanProvideAvailability(string command)
Assert.Equal(2 / 144.0, actual, precision: 4);
}
- [Theory]
- [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}", true)]
- [InlineData("python python/remote.py localhost {remote-port}", true)]
-#if LINUX
- [InlineData("bash bash/remote.sh localhost {remote-port}", false)]
-#endif
- public async Task CanReadFullDay(string command, bool complexData)
+[Fact]
+ public async Task CanReadFullDay()
{
+ // TODO fix this
+ var complexData = true;
+
+ await _fixture.Initialize;
+
var dataSource = new Remote() as IDataSource;
- var context = CreateContext(command);
+ var context = CreateContext();
await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None);
- var catalog = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None);
+ var catalog = await dataSource.EnrichCatalogAsync(new ResourceCatalog("/A/B/C"), CancellationToken.None);
var resource = catalog.Resources![0];
var representation = resource.Representations![0];
@@ -148,7 +141,7 @@ void GenerateData(DateTimeOffset dateTime)
expectedStatus.AsSpan().Fill((byte)'s');
}
- var request = new ReadRequest(catalogItem, data, status);
+ var request = new ReadRequest(resource.Id, catalogItem, data, status);
await dataSource.ReadAsync(begin, end, [request], default!, new Progress(), CancellationToken.None);
var longData = new CastMemoryManager(data).Memory;
@@ -156,17 +149,14 @@ void GenerateData(DateTimeOffset dateTime)
Assert.True(expectedStatus.SequenceEqual(status.ToArray()));
}
- [Theory]
- [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")]
- [InlineData("python python/remote.py localhost {remote-port}")]
-#if LINUX
- [InlineData("bash bash/remote.sh localhost {remote-port}")]
-#endif
- public async Task CanLog(string command)
+[Fact]
+ public async Task CanLog()
{
+ await _fixture.Initialize;
+
var loggerMock = new Mock();
var dataSource = new Remote() as IDataSource;
- var context = CreateContext(command);
+ var context = CreateContext();
await dataSource.SetContextAsync(context, loggerMock.Object, CancellationToken.None);
@@ -182,17 +172,17 @@ public async Task CanLog(string command)
);
}
- [Theory]
- [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")]
- [InlineData("python python/remote.py localhost {remote-port}")]
- public async Task CanReadDataHandler(string command)
+[Fact]
+ public async Task CanReadDataHandler()
{
+ await _fixture.Initialize;
+
var dataSource = new Remote() as IDataSource;
- var context = CreateContext(command);
+ var context = CreateContext();
await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None);
- var catalog = await dataSource.GetCatalogAsync("/D/E/F", CancellationToken.None);
+ var catalog = await dataSource.EnrichCatalogAsync(new ResourceCatalog("/D/E/F"), CancellationToken.None);
var resource = catalog.Resources![0];
var representation = resource.Representations![0];
@@ -230,7 +220,7 @@ Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end, Memo
return Task.CompletedTask;
}
- var request = new ReadRequest(catalogItem, data, status);
+ var request = new ReadRequest(resource.Id, catalogItem, data, status);
await dataSource.ReadAsync(begin, end, [request], HandleReadDataAsync, new Progress(), CancellationToken.None);
var doubleData = new CastMemoryManager(data).Memory;
@@ -238,10 +228,10 @@ Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end, Memo
Assert.True(expectedStatus.SequenceEqual(status.ToArray()));
}
- private static DataSourceContext CreateContext(string command)
+ private static DataSourceContext CreateContext()
{
return new DataSourceContext(
- ResourceLocator: new Uri("file:///" + Path.Combine(Directory.GetCurrentDirectory(), "TESTDATA")),
+ ResourceLocator: new Uri("tcp://127.0.0.1:56145"),
SystemConfiguration: new Dictionary()
{
[typeof(Remote).FullName!] = JsonSerializer.SerializeToElement(new JsonObject()
@@ -254,10 +244,8 @@ private static DataSourceContext CreateContext(string command)
},
SourceConfiguration: new Dictionary()
{
- ["listen-address"] = JsonSerializer.SerializeToElement("127.0.0.1"),
- ["listen-port-min"] = JsonSerializer.SerializeToElement("63000"),
- ["template"] = JsonSerializer.SerializeToElement("local"),
- ["command"] = JsonSerializer.SerializeToElement(command),
+ ["type"] = JsonSerializer.SerializeToElement("Nexus.Sources.DotnetDataSource"),
+ ["resourceLocator"] = JsonSerializer.SerializeToElement("file:///" + Path.Combine(Directory.GetCurrentDirectory(), "TESTDATA")),
["environment-variables"] = JsonSerializer.SerializeToElement(new JsonObject()
{
["PYTHONPATH"] = $"{Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "..", "..", "src", "remoting", "python-remoting")}"
diff --git a/tests/Nexus.Sources.Remote.Tests/RemoteTestsFixture.cs b/tests/Nexus.Sources.Remote.Tests/RemoteTestsFixture.cs
new file mode 100644
index 0000000..9e380bc
--- /dev/null
+++ b/tests/Nexus.Sources.Remote.Tests/RemoteTestsFixture.cs
@@ -0,0 +1,120 @@
+using System.Diagnostics;
+
+namespace Nexus.Sources.Tests;
+
+public class RemoteTestsFixture : IDisposable
+{
+ private Process? _buildProcess;
+
+ private Process? _runProcess;
+
+ private readonly SemaphoreSlim _semaphoreBuild = new(0, 1);
+
+ private readonly SemaphoreSlim _semaphoreRun = new(0, 1);
+
+ private bool _success;
+
+ public RemoteTestsFixture()
+ {
+ Initialize = Task.Run(async () =>
+ {
+ /* Why not `dotnet run`? Because it spawns a child process for which
+ * we do not know the process ID and so we cannot kill it.
+ */
+
+ // Build Nexus.Agent
+ var psi_build = new ProcessStartInfo("bash")
+ {
+ /* Why `sleep infinity`? Because the test debugger seems to stop whenever a child process stops */
+ Arguments = "-c \"dotnet build ../../../../src/Nexus.Agent/Nexus.Agent.csproj && sleep infinity\"",
+ UseShellExecute = false,
+ RedirectStandardOutput = true,
+ RedirectStandardError = true
+ };
+
+ _buildProcess = new Process
+ {
+ StartInfo = psi_build,
+ EnableRaisingEvents = true
+ };
+
+ _buildProcess.OutputDataReceived += (sender, e) =>
+ {
+ if (e.Data is not null && e.Data.Contains("Build succeeded"))
+ {
+ _success = true;
+ _semaphoreBuild.Release();
+ }
+ };
+
+ _buildProcess.ErrorDataReceived += (sender, e) =>
+ {
+ _success = false;
+ _semaphoreBuild.Release();
+ };
+
+ _buildProcess.Start();
+ _buildProcess.BeginOutputReadLine();
+ _buildProcess.BeginErrorReadLine();
+
+ await _semaphoreBuild.WaitAsync(TimeSpan.FromMinutes(1));
+
+ if (!_success)
+ throw new Exception("Unable to build Nexus.Agent.");
+
+ // Run Nexus.Agent
+ var psi_run = new ProcessStartInfo("dotnet")
+ {
+ Arguments = $"../../artifacts/bin/Nexus.Agent/debug/Nexus.Agent.dll",
+ WorkingDirectory="../../../../src/Nexus.Agent",
+ UseShellExecute = false,
+ RedirectStandardOutput = true,
+ RedirectStandardError = true
+ };
+
+ psi_run.Environment["NEXUSAGENT_Paths__Config"] = "../../.nexus-agent/config";
+
+ _runProcess = new Process
+ {
+ StartInfo = psi_run,
+ EnableRaisingEvents = true
+ };
+
+ _runProcess.OutputDataReceived += (sender, e) =>
+ {
+ // File.AppendAllText("/home/vincent/Downloads/output.txt", e.Data + Environment.NewLine);
+
+ if (e.Data is not null && e.Data.Contains("Now listening on"))
+ {
+ _success = true;
+ _semaphoreRun.Release();
+ }
+ };
+
+ _runProcess.ErrorDataReceived += (sender, e) =>
+ {
+ // File.AppendAllText("/home/vincent/Downloads/error.txt", e.Data + Environment.NewLine);
+
+ _success = false;
+ _semaphoreRun.Release();
+ };
+
+ _runProcess.Start();
+ _runProcess.BeginOutputReadLine();
+ _runProcess.BeginErrorReadLine();
+
+ await _semaphoreRun.WaitAsync(TimeSpan.FromMinutes(1));
+
+ if (!_success)
+ throw new Exception("Unable to launch Nexus.Agent.");
+ });
+ }
+
+ public Task Initialize { get; }
+
+ public void Dispose()
+ {
+ _buildProcess?.Kill();
+ _runProcess?.Kill();
+ }
+}
diff --git a/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs b/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs
deleted file mode 100644
index 928aba1..0000000
--- a/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs
+++ /dev/null
@@ -1,103 +0,0 @@
-using Microsoft.Extensions.Logging.Abstractions;
-using Nexus.DataModel;
-using Nexus.Extensibility;
-using System.Text.Json;
-using System.Text.Json.Nodes;
-using Xunit;
-
-namespace Nexus.Sources.Tests;
-
-[Trait("TestCategory", "docker")]
-public class SetupDockerTests
-{
-#if LINUX
- [Theory]
- [InlineData("python", "main.py nexus-main {remote-port}", "v2.0.0-beta.25")]
- [InlineData("dotnet", "nexus-remoting-sample.csproj nexus-main {remote-port}", "v2.0.0-beta.24")]
-#endif
- public async Task CanReadFullDay(string satelliteId, string command, string version)
- {
- var dataSource = new Remote() as IDataSource;
- var context = CreateContext(satelliteId, command, version);
-
- await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None);
-
- var catalog = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None);
- var resource = catalog.Resources![0];
- var representation = resource.Representations![0];
-
- var catalogItem = new CatalogItem(
- catalog with { Resources = default! },
- resource with { Representations = default! },
- representation,
- default);
-
- var begin = new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc);
- var end = new DateTime(2020, 01, 01, 0, 0, 10, DateTimeKind.Utc);
- var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end);
-
- var length = 10;
- var expectedData = new double[length];
- var expectedStatus = new byte[length];
-
- for (int i = 0; i < length; i++)
- {
- expectedData[i] = i * 2;
- }
-
- expectedStatus.AsSpan().Fill(1);
-
- Task ReadData(string resourcePath, DateTime begin, DateTime end, Memory buffer, CancellationToken cancellationToken)
- {
- var spanBuffer = buffer.Span;
-
- for (int i = 0; i < length; i++)
- {
- spanBuffer[i] = i;
- }
-
- return Task.CompletedTask;
- }
-
- var request = new ReadRequest(catalogItem, data, status);
-
- await dataSource.ReadAsync(
- begin,
- end,
- [request],
- ReadData,
- new Progress(),
- CancellationToken.None);
-
- var doubleData = new CastMemoryManager(data).Memory;
-
- Assert.True(expectedData.SequenceEqual(doubleData.ToArray()));
- Assert.True(expectedStatus.SequenceEqual(status.ToArray()));
- }
-
- private static DataSourceContext CreateContext(string satelliteId, string command, string version)
- {
- return new DataSourceContext(
- ResourceLocator: default,
- SystemConfiguration: new Dictionary()
- {
- [typeof(Remote).FullName!] = JsonSerializer.SerializeToElement(new JsonObject()
- {
- ["templates"] = new JsonObject()
- {
- ["docker"] = $"ssh root@nexus-{satelliteId} bash run.sh {{git-url}} {{git-tag}} {{command}}"
- }
- })
- },
- SourceConfiguration: new Dictionary()
- {
- ["listen-address"] = JsonSerializer.SerializeToElement("0.0.0.0"),
- ["template"] = JsonSerializer.SerializeToElement("docker"),
- ["command"] = JsonSerializer.SerializeToElement(command),
- ["git-url"] = JsonSerializer.SerializeToElement($"https://github.com/nexus-main/nexus-remoting-template-{satelliteId}"),
- ["git-tag"] = JsonSerializer.SerializeToElement(version)
- },
- RequestConfiguration: default
- );
- }
-}
diff --git a/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.sh b/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.sh
deleted file mode 100644
index 2c770a9..0000000
--- a/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-setup_folder="setup/docker"
-satellite_ids="python dotnet"
-
-for satellite_id in $satellite_ids; do
- bash "${setup_folder}/setup-host.sh" $satellite_id
-done
-
-docker-compose --file "${setup_folder}/docker-compose.yml" up -d
-
-while true; do
- docker exec "nexus-main" test -f "/var/lib/nexus/ready" && \
- docker exec "nexus-${satellite_id}" test -f "/var/lib/nexus/ready" && \
- break
-
- echo "Waiting for Docker containers to become ready ..."
- sleep 1;
-done
-
-docker exec nexus-main bash -c "cd /root/nexus-sources-remote; dotnet test --filter TestCategory=docker"
\ No newline at end of file
diff --git a/tests/Nexus.Sources.Remote.Tests/bash/catalog.json b/tests/Nexus.Sources.Remote.Tests/bash/catalog.json
deleted file mode 100644
index 36f6f0e..0000000
--- a/tests/Nexus.Sources.Remote.Tests/bash/catalog.json
+++ /dev/null
@@ -1,41 +0,0 @@
-{
- "Catalog": {
- "Id": "/A/B/C",
- "Properties": {
- "a": "b",
- "c": 1
- },
- "Resources": [
- {
- "Id": "resource1",
- "Properties": {
- "unit": "°C",
- "groups": [
- "group1"
- ]
- },
- "Representations": [
- {
- "DataType": "INT64",
- "SamplePeriod": "00:00:01"
- }
- ]
- },
- {
- "Id": "resource2",
- "Properties": {
- "unit": "bar",
- "groups": [
- "group2"
- ]
- },
- "Representations": [
- {
- "DataType": "FLOAT64",
- "SamplePeriod": "00:00:01"
- }
- ]
- }
- ]
- }
-}
\ No newline at end of file
diff --git a/tests/Nexus.Sources.Remote.Tests/bash/remote.sh b/tests/Nexus.Sources.Remote.Tests/bash/remote.sh
deleted file mode 100644
index ad9ae17..0000000
--- a/tests/Nexus.Sources.Remote.Tests/bash/remote.sh
+++ /dev/null
@@ -1,159 +0,0 @@
-#!/bin/bash
-# test command = dotnet test --filter BashRpcDataSourceTests ./tests/Nexus.Core.Tests/Nexus.Core.Tests.csproj
-
-# quit on error
-set -o errexit
-
-# check prerequisites
-if ! command -v jq &> /dev/null; then
- echo "I require jq but it's not installed." 1>&2
- exit
-fi
-
-# main
-main() {
-
- # open sockets (https://admin-ahead.com/forum/general-linux/how-to-open-a-tcpudp-socket-in-a-bash-shell/)
- echo "Connecting to $1:$2 ..."
-
- exec 3<>/dev/tcp/$1/$2
- echo -ne "comm" >&3
-
- exec 4>/dev/tcp/$1/$2
- echo -ne "data" >&4
-
- echo "Starting to listen for JSON-RPC messages ..."
- listen
-
- read dummy
-}
-
-# process incoming messages
-listen() {
-
- while true; do
-
- # get json length
- read32BE json_length <&3
-
- # get json
- json=$(dd bs=$json_length count=1 <&3 2> /dev/null)
- tmp=$(echo $json | jq --raw-output '. | to_entries | map("[\(.key)]=\(.value)") | reduce .[] as $item ("invocation=("; . + $item + " ") + ")"')
- declare -A "$tmp"
-
- jsonrpc=${invocation[jsonrpc]}
- id=${invocation[id]}
- method=${invocation[method]}
-
- # check jsonrpc
- if [ "$jsonrpc" != "2.0" ]; then
- echo "Only JSON-RPC messages are supported." 1>&2
- exit 1
- fi
-
- # check id
- if [ -z "$id" ]; then
- echo "Notifications are not supported." 1>&2
- exit 1
- fi
-
- # prepare response
- echo "Received invocation for method '$method'. Preparing response ..."
-
- if [ "$method" = "getApiVersion" ]; then
- response='{ "jsonrpc": "2.0", "id": '$id', "result": { "ApiVersion": 1 } }'
-
- elif [ "$method" = "setContext" ]; then
- response='{ "jsonrpc": "2.0", "id": '$id', "result": null }'
-
- echo "Sending log message ..."
- log_message='{ "jsonrpc": "2.0", "method": "log", "params": [ "Information", "Logging works!" ] }'
- declare -i log_message_length=84
- write $log_message_length 32 "dummy" >&3
- printf "$log_message" >&3
-
- elif [ "$method" = "getCatalogIds" ]; then
- response='{ "jsonrpc": "2.0", "id": '$id', "result": { "CatalogIds": [ "/A/B/C" ] } }'
-
- elif [ "$method" = "getCatalog" ]; then
- catalog=$(&2
- exit 1
-
- fi
-
- # get response length
- local_lang=$LANG local_lc_all=$LC_ALL
- LANG=C LC_ALL=C
- byte_length=${#response}
- LANG=$local_lang LC_ALL=$local_lc_all
-
- # send response
- echo "Sending response ($byte_length bytes) ..."
- write $byte_length 32 "dummy" >&3
- printf "$response" >&3
-
- if [ "$method" = "readSingle" ]; then
-
- # send data (86400 seconds per day * 3 days * 8 bytes)
- echo "Sending data ..."
- printf 'd%.0s' {1..2073600} >&4
-
- # send status
- echo "Sending status ..."
- printf 's%.0s' {1..259200} >&4
-
- fi
-
- done
-}
-
-# read and write bytes (https://stackoverflow.com/questions/13889659/read-a-file-by-bytes-in-bash)
-read8() {
- local _r8_var=${1:-OUTBIN} _r8_car LANG=C IFS=
- read -r -d '' -n 1 _r8_car
- printf -v $_r8_var %d \'$_r8_car;
-}
-
-read16BE() {
- local _r16_var=${1:-OUTBIN} _r16_lb _r16_hb
- read8 _r16_hb
- read8 _r16_lb
- printf -v $_r16_var %d $(( _r16_hb<<8 | _r16_lb ));
-}
-
-read32BE() {
- local _r32_var=${1:-OUTBIN} _r32_lw _r32_hw
- read16BE _r32_hw
- read16BE _r32_lw
- printf -v $_r32_var %d $(( _r32_hw<<16| _r32_lw ));
-}
-
-# Usage: write [bits:64|32|16|8] [switchto big endian]
-write () {
- local i=$((${2:-64}/8)) o= v r
- r=$((i-1))
-
- for ((;i--;)) {
- printf -vv '\%03o' $(( ($1>>8*(0${3+-1}?i:r-i))&255 ))
- o+=$v
- }
-
- printf "$o"
-}
-
-# run main
-main "$@"; exit
\ No newline at end of file
diff --git a/tests/Nexus.Sources.Remote.Tests/dotnet/remote.cs b/tests/Nexus.Sources.Remote.Tests/dotnet/v1/remote.cs
similarity index 87%
rename from tests/Nexus.Sources.Remote.Tests/dotnet/remote.cs
rename to tests/Nexus.Sources.Remote.Tests/dotnet/v1/remote.cs
index d3b47cc..10d6868 100644
--- a/tests/Nexus.Sources.Remote.Tests/dotnet/remote.cs
+++ b/tests/Nexus.Sources.Remote.Tests/dotnet/v1/remote.cs
@@ -4,39 +4,12 @@
using Microsoft.Extensions.Logging;
using Nexus.DataModel;
using Nexus.Extensibility;
-using Nexus.Remoting;
-namespace Nexus.Remote;
+namespace Nexus.Sources;
-#warning Inherit from StructuredFileDataSource would be possible but collides with ReadAndModifyNexusData method"
-
-public static class Program
-{
- public static async Task Main(string[] args)
- {
- // args
- if (args.Length < 2)
- throw new Exception("No argument for address and/or port was specified.");
-
- // get address
- var address = args[0];
-
- // get port
- int port;
-
- try
- {
- port = int.Parse(args[1]);
- }
- catch (Exception ex)
- {
- throw new Exception("The second command line argument must be a valid port number.", ex);
- }
-
- var communicator = new RemoteCommunicator(new DotnetDataSource(), address, port);
- await communicator.RunAsync();
- }
-}
+/* Note: Inherit from StructuredFileDataSource would be possible
+ * but collides with ReadAndModifyNexusData method
+ */
public class DotnetDataSource : IDataSource
{
@@ -69,11 +42,9 @@ public Task GetCatalogRegistrationsAsync(string path, Can
return Task.FromResult(new CatalogRegistration[0]);
}
- public Task GetCatalogAsync(string catalogId, CancellationToken cancellationToken)
+ public Task EnrichCatalogAsync(ResourceCatalog catalog, CancellationToken cancellationToken)
{
- ResourceCatalog catalog;
-
- if (catalogId == "/A/B/C")
+ if (catalog.Id == "/A/B/C")
{
var representation1 = new Representation(NexusDataType.INT64, TimeSpan.FromSeconds(1));
@@ -97,7 +68,7 @@ public Task GetCatalogAsync(string catalogId, CancellationToken
.AddResources(resource1, resource2)
.Build();
}
- else if (catalogId == "/D/E/F")
+ else if (catalog.Id == "/D/E/F")
{
var representation = new Representation(NexusDataType.FLOAT64, TimeSpan.FromSeconds(1));
@@ -123,7 +94,7 @@ public Task GetCatalogAsync(string catalogId, CancellationToken
if (catalogId != "/A/B/C")
throw new Exception("Unknown catalog identifier.");
- var filePaths = Directory.GetFiles(_context.ResourceLocator.ToPath(), "*.dat", SearchOption.AllDirectories);
+ var filePaths = Directory.GetFiles(_context.ResourceLocator!.ToPath(), "*.dat", SearchOption.AllDirectories);
var fileNames = filePaths.Select(filePath => Path.GetFileName(filePath));
var dateTimes = fileNames
@@ -146,7 +117,7 @@ public Task GetAvailabilityAsync(string catalogId, DateTime begin, DateT
var periodPerFile = TimeSpan.FromMinutes(10);
var maxFileCount = (end - begin).Ticks / periodPerFile.Ticks;
- var filePaths = Directory.GetFiles(_context.ResourceLocator.ToPath(), "*.dat", SearchOption.AllDirectories);
+ var filePaths = Directory.GetFiles(_context.ResourceLocator!.ToPath(), "*.dat", SearchOption.AllDirectories);
var fileNames = filePaths.Select(filePath => Path.GetFileName(filePath));
var actualFileCount = fileNames
@@ -212,7 +183,7 @@ CancellationToken cancellationToken
while (currentBegin < end)
{
// find files
- var searchPath = Path.Combine(_context.ResourceLocator.ToPath(), currentBegin.ToString("yyyy-MM"), currentBegin.ToString("yyyy-MM-dd"));
+ var searchPath = Path.Combine(_context.ResourceLocator!.ToPath(), currentBegin.ToString("yyyy-MM"), currentBegin.ToString("yyyy-MM-dd"));
var filePaths = Directory.GetFiles(searchPath, "*.dat", SearchOption.AllDirectories);
foreach (var filePath in filePaths)
diff --git a/tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj b/tests/Nexus.Sources.Remote.Tests/dotnet/v1/remote.csproj
similarity index 58%
rename from tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj
rename to tests/Nexus.Sources.Remote.Tests/dotnet/v1/remote.csproj
index 5bd68a8..96a84c4 100644
--- a/tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj
+++ b/tests/Nexus.Sources.Remote.Tests/dotnet/v1/remote.csproj
@@ -2,15 +2,12 @@
$(TargetFrameworkVersion)
- Exe
-
-
-
-
-
+
+ runtime;native
+
diff --git a/version.json b/version.json
index d5c1679..eaf8289 100644
--- a/version.json
+++ b/version.json
@@ -1,4 +1,4 @@
{
"version": "2.0.0",
- "suffix": "beta.24"
+ "suffix": "beta.39"
}
\ No newline at end of file