forked from berty/berty
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.ts
106 lines (96 loc) · 3.04 KB
/
service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import * as pbjs from 'protobufjs'
import { ServiceClientType } from './welsh-clients.gen'
const lowerFirst = (str: string) => str.charAt(0).toLowerCase() + str.substring(1)
// Unary generator
const createUnaryMethod = <M extends pbjs.Method>(method: M, unaryCall: unknown) => {
const requestType = method.resolvedRequestType
const responseType = method.resolvedResponseType
return async (payload: Uint8Array, metadata: unknown) => {
const req = requestType?.encode(payload).finish()
const res = await unaryCall(method, req, metadata)
return responseType?.decode(res)
}
}
const createUnaryMethodList = <S extends pbjs.Service>(
service: S,
rpcImpl: unknown,
middleware: unknown,
) => {
const methods = {}
Object.keys(service.methods || {}).forEach((key: keyof typeof service.methods) => {
const method = service.methods[key]
if (!method.responseStream && !method.requestStream) {
const lkey = lowerFirst(key)
const genMethod = createUnaryMethod(method, rpcImpl.unaryCall)
methods[lkey] = middleware(method, genMethod)
}
})
return methods
}
// Streams generator
const createStreamMethod = <M extends pbjs.Method>(method: M, streamCall: unknown) => {
const requestType = method.resolvedRequestType
const responseType = method.resolvedResponseType
return async (payload: Uint8Array, metadata: unknown) => {
const req = requestType?.encode(payload).finish()
const stream = await streamCall(method, req, metadata)
return {
onMessage: (listener) => {
stream.onMessage((buf, err) => {
if (err) {
listener(null, err)
return
}
try {
const res = responseType.decode(buf)
listener(res, null)
} catch (e) {
console.error('invalid response type', e)
}
})
},
emit: async (payload) => {
const req = requestType.encode(payload).finish()
return stream.emit(req)
},
start: async () => stream.start(),
stop: async () => stream.stop(),
stopAndRecv: async () => stream.stopAndRecv(),
}
}
}
const createStreamMethodList = <S extends typeof pbjs.rpc.Service>(
service: S,
rpcImpl: unknown,
middleware: unknown,
) => {
const methods = {}
Object.keys(service.methods || {}).forEach((key) => {
const method = service.methods[key]
if (method.responseStream || method.requestStream) {
const lkey = lowerFirst(key)
const genMethod = createStreamMethod(method, rpcImpl.streamCall)
methods[lkey] = middleware(method, genMethod)
}
})
return methods
}
export const createService = <T extends typeof pbjs.rpc.Service, S extends InstanceType<T>>(
service: T,
rpcImpl: unknown,
middleware?: unknown,
): ServiceClientType<S> => {
if (!service) {
throw new Error('invalid service')
}
if (typeof rpcImpl === 'undefined') {
throw new Error('no rpc implem provided')
}
if (!middleware) {
middleware = (method: unknown, call: unknown) => call
}
const sr = service.resolveAll()
const unaryMethods = createUnaryMethodList(sr, rpcImpl, middleware)
const streamMethods = createStreamMethodList(sr, rpcImpl, middleware)
return Object.assign(unaryMethods, streamMethods)
}