Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sequencer, charts)!: support uds for abci #1877

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/sequencer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.0.1
version: 1.0.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
Expand Down
2 changes: 1 addition & 1 deletion charts/sequencer/files/cometbft/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ version = "0.38.8"

# TCP or UNIX socket address of the ABCI application,
# or the name of an ABCI application compiled in with the CometBFT binary
proxy_app = "tcp://127.0.0.1:{{ .Values.ports.sequencerABCI }}"
proxy_app = "{{ include "sequencer.abci_url" . }}"

# A custom human readable name for this node
moniker = "{{ .Values.moniker }}"
Expand Down
2 changes: 0 additions & 2 deletions charts/sequencer/files/scripts/init-cometbft.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ if [ ! -d "/cometbft/config" ]; then
else
cp /config/* /cometbft/config/
fi

chmod -R 0777 /cometbft
18 changes: 16 additions & 2 deletions charts/sequencer/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,23 @@ name: {{ .Values.moniker }}-sequencer-metrics
{{- end }}

{{/* New sequencer address */}}
{{- define "sequencer.address"}}{ "bech32m": "{{ . }}" }
{{- define "sequencer.address" -}}
{ "bech32m": "{{ . }}" }
{{- end }}

{{/* uint64 fee converted to a astria proto Uint128 with only lo set */}}
{{- define "sequencer.toUint128Proto"}}{ "lo": {{ . }} }
{{- define "sequencer.toUint128Proto" -}}
{ "lo": {{ . }} }
{{- end }}

{{- define "sequencer.socket_directory" -}}
/sockets/
{{- end }}

{{- define "sequencer.abci_url" -}}
{{- if and .Values.global.dev .Values.sequencer.abciUDS -}}
unix://{{- include "sequencer.socket_directory" . }}abci.sock
{{- else -}}
tcp://127.0.0.1:{{ .Values.ports.sequencerABCI }}
{{- end }}
{{- end }}
5 changes: 3 additions & 2 deletions charts/sequencer/templates/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ metadata:
name: {{ .Values.moniker }}-sequencer-env
namespace: {{ include "sequencer.namespace" . }}
data:
ASTRIA_SEQUENCER_LOG: "astria_sequencer=debug"
ASTRIA_SEQUENCER_LISTEN_ADDR: "127.0.0.1:{{ .Values.ports.sequencerABCI }}"
ASTRIA_SEQUENCER_LOG: "info"
ASTRIA_SEQUENCER_DB_FILEPATH: "/sequencer/penumbra.db"
ASTRIA_SEQUENCER_MEMPOOL_PARKED_MAX_TX_COUNT: "{{ .Values.sequencer.mempool.parked.maxTxCount }}"
# Socket address for GRPC server
Expand All @@ -74,6 +73,8 @@ data:
OTEL_EXPORTER_OTLP_TRACE_HEADERS: "{{ .Values.sequencer.otel.traceHeaders }}"
OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}"
{{- if not .Values.global.dev }}
ASTRIA_SEQUENCER_LISTEN_ADDR: "127.0.0.1:{{ .Values.ports.sequencerABCI }}"
{{- else }}
ASTRIA_SEQUENCER_ABCI_LISTENER_URL: "{{ include "sequencer.abci_url" . }}"
{{- end }}
---
9 changes: 9 additions & 0 deletions charts/sequencer/templates/statefulsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ spec:
labels:
app: {{ .Values.moniker }}-sequencer
spec:
securityContext:
runAsUser: 1000
fsGroup: 2000
initContainers:
- command: [ "/scripts/init-cometbft.sh" ]
name: config-cometbft
Expand Down Expand Up @@ -45,6 +48,8 @@ spec:
- mountPath: /sequencer
name: sequencer-shared-storage-vol
subPath: {{ .Values.moniker }}/sequencer
- mountPath: {{ include "sequencer.socket_directory" . }}
name: socket-volume
ports:
- containerPort: {{ .Values.ports.sequencerABCI }}
name: sequencer-abci
Expand Down Expand Up @@ -78,6 +83,8 @@ spec:
- mountPath: /secrets
readOnly: true
name: sequencer-secret-keys-vol
- mountPath: {{ include "sequencer.socket_directory" . }}
name: socket-volume
ports:
- containerPort: {{ .Values.ports.cometbftP2P }}
name: cometbft-p2p
Expand All @@ -95,6 +102,8 @@ spec:
cpu: {{ .Values.resources.cometbft.limits.cpu }}
memory: {{ .Values.resources.cometbft.limits.memory }}
volumes:
- name: socket-volume
emptyDir: {}
- name: cometbft-config-volume
configMap:
name: {{ .Values.moniker }}-cometbft-config
Expand Down
5 changes: 3 additions & 2 deletions charts/sequencer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ images:
repo: ghcr.io/astriaorg/sequencer
pullPolicy: IfNotPresent
tag: 1.0.0
devTag: latest
devTag: local

moniker: ""
genesis:
Expand Down Expand Up @@ -105,6 +105,7 @@ genesis:
# pubKey: lV57+rGs2vac7mvkGHP1oBFGHPJM3a+WoAzeFDCJDNU=

sequencer:
abciUDS: true
mempool:
parked:
maxTxCount: 200
Expand Down Expand Up @@ -311,7 +312,7 @@ storage:
local: true
entities:
sequencerSharedStorage:
size: "5Gi"
size: "50Gi"
persistentVolumeName: "sequencer-shared-storage"
path: "/data/sequencer-data"

Expand Down
8 changes: 8 additions & 0 deletions crates/astria-sequencer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Ensure all deposit assets are trace prefixed [#1807](https://github.com/astriaorg/astria/pull/1807).
- Update `idna` dependency to resolve cargo audit warning [#1869](https://github.com/astriaorg/astria/pull/1869).

### Removed

- Remove ASTRIA_SEQUENCER_LISTEN_ADDR config variable [#1877](https://github.com/astriaorg/astria/pull/1877)

### Added

- Add ASTRIA_SEQUENCER_ABCI_LISTENER_URL config variable [#1877](https://github.com/astriaorg/astria/pull/1877)

## [1.0.0] - 2024-10-25

### Changed
Expand Down
1 change: 1 addition & 0 deletions crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
tower-http = { version = "0.4", features = ["cors"] }
url = "2.5.4"

async-trait = { workspace = true }
base64 = { workspace = true }
Expand Down
8 changes: 5 additions & 3 deletions crates/astria-sequencer/local.env.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Socket address to listen for ABCI requests from cometbft.
# This address corresponds to the `--proxy_app "tcp://<ASTRIA_SEQUENCER_LISTEN_ADDR>"`,
# where `tcp://127.0.0.1:26658` is comebft's default.
ASTRIA_SEQUENCER_LISTEN_ADDR="127.0.0.1:26658"
# This address corresponds to the `--proxy_app "<ASTRIA_SEQUENCER_ABCI_LISTENER_URL>"`,
# where `tcp://127.0.0.1:26658` is comebft's default. Can also be configured to
# use a unix address ie `unix:///socket/astria_abci.sock`. Generally will see
# much higher performance with a unix socket.
ASTRIA_SEQUENCER_ABCI_LISTENER_URL="tcp://127.0.0.1:26658"

# Path to rocksdb
ASTRIA_SEQUENCER_DB_FILEPATH="/tmp/astria_db"
Expand Down
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{
#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
/// The endpoint on which Sequencer will listen for ABCI requests
pub listen_addr: String,
pub abci_listener_url: String,
/// The path to penumbra storage db.
pub db_filepath: PathBuf,
/// Log level: debug, info, warn, or error
Expand Down
123 changes: 89 additions & 34 deletions crates/astria-sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use astria_eyre::{
eyre::{
eyre,
OptionExt as _,
Report,
Result,
WrapErr as _,
},
Expand Down Expand Up @@ -33,6 +34,7 @@ use tracing::{
info,
info_span,
};
use url::Url;

use crate::{
app::App,
Expand Down Expand Up @@ -95,26 +97,7 @@ impl Sequencer {
.await
.wrap_err("failed to initialize app")?;

let consensus_service = tower::ServiceBuilder::new()
.layer(request_span::layer(|req: &ConsensusRequest| {
req.create_span()
}))
.service(tower_actor::Actor::new(10, |queue: _| {
let storage = storage.clone();
async move { service::Consensus::new(storage, app, queue).run().await }
}));
let mempool_service = service::Mempool::new(storage.clone(), mempool.clone(), metrics);
let info_service =
service::Info::new(storage.clone()).wrap_err("failed initializing info service")?;
let snapshot_service = service::Snapshot;

let server = Server::builder()
.consensus(consensus_service)
.info(info_service)
.mempool(mempool_service)
.snapshot(snapshot_service)
.finish()
.ok_or_eyre("server builder didn't return server; are all fields set?")?;

let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let (server_exit_tx, server_exit_rx) = tokio::sync::oneshot::channel();
Expand All @@ -125,20 +108,15 @@ impl Sequencer {
.wrap_err("failed to parse grpc_addr address")?;
let grpc_server_handle = start_grpc_server(&storage, mempool, grpc_addr, shutdown_rx);

span.in_scope(|| info!(config.listen_addr, "starting sequencer"));
let server_handle = tokio::spawn(async move {
match server.listen_tcp(&config.listen_addr).await {
Ok(()) => {
// this shouldn't happen, as there isn't a way for the ABCI server to exit
info_span!("abci_server").in_scope(|| info!("ABCI server exited successfully"));
}
Err(e) => {
error_span!("abci_server")
.in_scope(|| error!(err = e.as_ref(), "ABCI server exited with error"));
}
}
let _ = server_exit_tx.send(());
});
span.in_scope(|| info!(config.abci_listener_url, "starting abci sequencer"));
let abci_server_handle = start_abci_server(
&storage,
app,
mempool_service,
&config.abci_listener_url,
server_exit_tx,
)
.wrap_err("failed to start ABCI server")?;

select! {
_ = signals.stop_rx.changed() => {
Expand All @@ -157,7 +135,7 @@ impl Sequencer {
.await
.wrap_err("grpc server task failed")?
.wrap_err("grpc server failed")?;
server_handle.abort();
abci_server_handle.abort();
Ok(())
}
}
Expand Down Expand Up @@ -210,6 +188,83 @@ fn start_grpc_server(
)
}

fn start_abci_server(
storage: &cnidarium::Storage,
app: App,
mempool_service: service::Mempool,
listen_url: &str,
server_exit_tx: oneshot::Sender<()>,
) -> Result<JoinHandle<()>, Report> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace this by the eyre::Result type alias:

Suggested change
) -> Result<JoinHandle<()>, Report> {
) -> eyre::Result<JoinHandle<()>> {

// Setup services required for the ABCI server
let consensus_service = tower::ServiceBuilder::new()
.layer(request_span::layer(|req: &ConsensusRequest| {
req.create_span()
}))
.service(tower_actor::Actor::new(10, |queue: _| {
let storage = storage.clone();
async move { service::Consensus::new(storage, app, queue).run().await }
}));
let info_service =
service::Info::new(storage.clone()).wrap_err("failed initializing info service")?;
let snapshot_service = service::Snapshot;

// Builds the server but does not start listening.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment

let server = Server::builder()
.consensus(consensus_service)
.info(info_service)
.mempool(mempool_service)
.snapshot(snapshot_service)
.finish()
.ok_or_eyre("server builder didn't return server; are all fields set?")?;

// Validate and parse the listen_url received from the config.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (as well as the subsequent server start) could be done more elegantly by providing a type AbciListenUrl with variants Tcp and Unix and implement FromStr for it. Then we could also have a few simple unit tests.

let abci_url = Url::parse(listen_url).wrap_err("failed to parse listen_addr")?;
let validated_listen_addr = match abci_url.scheme() {
"unix" => match abci_url.to_file_path() {
Ok(_) => Ok(abci_url.path().to_string()),
Err(e) => Err(eyre!(
"failed parsing unix listen_addr file path, error: {:?}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure eyre::WrapErr::wrap_err was not doable because this is a boxed error? We have solved a related issue in astria_eyre by wrapping the boxed error. You can then construct a proper chain:

eyre!(e).wrap_err("failed rasing unix listen_addr file path")

e
)),
},
"tcp" => {
let host_str = abci_url
.host_str()
.ok_or_eyre("missing host in tcp listen_addr")?;
let port = abci_url
.port()
.ok_or_eyre("missing port in tcp listen_addr")?;
Ok(format!("{host_str}:{port}"))
}
// If more options are added here will also need to update the server startup
// immediately below to support more than two protocols.
_ => Err(eyre!(
"unsupported protocol in `abci_listener_url`, only unix and tcp are supported"
)),
}?;

let server_handle = tokio::spawn(async move {
let server_listen_result = if abci_url.scheme() == "unix" {
server.listen_unix(validated_listen_addr).await
} else {
server.listen_tcp(validated_listen_addr).await
};
match server_listen_result {
Ok(()) => {
// this shouldn't happen, as there isn't a way for the ABCI server to exit
info_span!("abci_server").in_scope(|| info!("ABCI server exited successfully"));
}
Err(e) => {
error_span!("abci_server")
.in_scope(|| error!(err = e.as_ref(), "ABCI server exited with error"));
}
}
let _ = server_exit_tx.send(());
});

Ok(server_handle)
}

struct SignalReceiver {
stop_rx: watch::Receiver<()>,
}
Expand Down
Loading