-
-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathserver.rs
120 lines (102 loc) · 3.8 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use futures::{stream, Sink, SinkExt, StreamExt};
use tokio::net::TcpListener;
use pgwire::api::auth::noop::NoopStartupHandler;
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type};
use pgwire::error::ErrorInfo;
use pgwire::error::{PgWireError, PgWireResult};
use pgwire::messages::response::NoticeResponse;
use pgwire::messages::PgWireBackendMessage;
use pgwire::tokio::process_socket;
pub struct DummyProcessor;
impl NoopStartupHandler for DummyProcessor {}
#[async_trait]
impl SimpleQueryHandler for DummyProcessor {
async fn do_query<'a, C>(
&self,
client: &mut C,
query: &'a str,
) -> PgWireResult<Vec<Response<'a>>>
where
C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
C::Error: Debug,
PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
{
client
.send(PgWireBackendMessage::NoticeResponse(NoticeResponse::from(
ErrorInfo::new(
"NOTICE".to_owned(),
"01000".to_owned(),
format!("Query received {}", query),
),
)))
.await?;
if query.starts_with("SELECT") {
let f1 = FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text);
let f2 = FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text);
let schema = Arc::new(vec![f1, f2]);
let data = vec![
(Some(0), Some("Tom")),
(Some(1), Some("Jerry")),
(Some(2), None),
];
let schema_ref = schema.clone();
let data_row_stream = stream::iter(data.into_iter()).map(move |r| {
let mut encoder = DataRowEncoder::new(schema_ref.clone());
encoder.encode_field(&r.0)?;
encoder.encode_field(&r.1)?;
encoder.finish()
});
Ok(vec![Response::Query(QueryResponse::new(
schema,
data_row_stream,
))])
} else {
Ok(vec![Response::Execution(Tag::new("OK").with_rows(1))])
}
}
}
struct DummyProcessorFactory {
handler: Arc<DummyProcessor>,
}
impl PgWireServerHandlers for DummyProcessorFactory {
type StartupHandler = DummyProcessor;
type SimpleQueryHandler = DummyProcessor;
type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
type CopyHandler = NoopCopyHandler;
type ErrorHandler = NoopErrorHandler;
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
self.handler.clone()
}
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
Arc::new(PlaceholderExtendedQueryHandler)
}
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
self.handler.clone()
}
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
Arc::new(NoopCopyHandler)
}
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
Arc::new(NoopErrorHandler)
}
}
#[tokio::main]
pub async fn main() {
let factory = Arc::new(DummyProcessorFactory {
handler: Arc::new(DummyProcessor),
});
let server_addr = "127.0.0.1:5432";
let listener = TcpListener::bind(server_addr).await.unwrap();
println!("Listening to {}", server_addr);
loop {
let incoming_socket = listener.accept().await.unwrap();
let factory_ref = factory.clone();
tokio::spawn(async move { process_socket(incoming_socket.0, None, factory_ref).await });
}
}