Skip to content

Commit

Permalink
feat: add subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
MARCO MANCO s281564 authored and ralls0 committed May 18, 2021
1 parent 9e04e4a commit 48465ed
Show file tree
Hide file tree
Showing 11 changed files with 982 additions and 273 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
.history
.idea
.vscode
node_modules

# MacBook
.DS_Store
1 change: 1 addition & 0 deletions qlkube/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
node_modules
.env
4 changes: 4 additions & 0 deletions qlkube/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@
},
"dependencies": {
"@graphql-tools/schema": "^7.1.4",
"@kubernetes/client-node": "^0.14.3",
"apollo-server": "2.24.0",
"apollo-server-express": "^2.9.7",
"compression": "^1.7.4",
"dotenv": "^9.0.2",
"express": "^4.17.1",
"express-graphql": "^0.12.0",
"got": "^9.6.0",
"graphql": "^14.5.8",
"graphql-tools": "^4.0.6",
Expand Down
101 changes: 49 additions & 52 deletions qlkube/src/decorateBaseSchema.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { gql } = require('apollo-server-core');
const { extendSchema } = require('graphql/utilities');
const { addResolversToSchema } = require('@graphql-tools/schema');
const { capitalizeType } = require('./utils.js');

/**
*
Expand All @@ -12,63 +13,59 @@ const { addResolversToSchema } = require('@graphql-tools/schema');
* @returns
*/

function capitalizeType(name) {
return name[0].toUpperCase() + name.slice(1);
}

module.exports = {
decorateBaseSchema: function (
targetQuery,
extendedType,
baseSchema,
nameWrapper,
argsNeeded
) {
if (!targetQuery) throw 'Parameter targetQuery cannot be empty!';
if (!extendedType) throw 'Parameter extendedType cannot be empty!';
if (!nameWrapper) throw 'Parameter nameWrapper cannot be empty!';
if (!argsNeeded) throw 'Parameter argsNeeded cannot be empty!';
if (!baseSchema) throw 'Parameter baseSchema cannot be empty!';
function decorateBaseSchema(
targetQuery,
extendedType,
baseSchema,
nameWrapper,
argsNeeded
) {
if (!targetQuery) throw 'Parameter targetQuery cannot be empty!';
if (!extendedType) throw 'Parameter extendedType cannot be empty!';
if (!nameWrapper) throw 'Parameter nameWrapper cannot be empty!';
if (!argsNeeded) throw 'Parameter argsNeeded cannot be empty!';
if (!baseSchema) throw 'Parameter baseSchema cannot be empty!';

if (baseSchema.getQueryType().getFields()[targetQuery] === undefined)
throw 'Parameter targetQuery not valid!';
const targetType = baseSchema.getQueryType().getFields()[targetQuery];
if (baseSchema.getQueryType().getFields()[targetQuery] === undefined)
throw 'Parameter targetQuery not valid!';
const targetType = baseSchema.getQueryType().getFields()[targetQuery];

if (!targetType) throw 'targetType fault!';
if (!targetType) throw 'targetType fault!';

let typeWrapper = capitalizeType(nameWrapper);
let typeTargetQuery = capitalizeType(targetQuery);
const typeWrapper = capitalizeType(nameWrapper);
const typeTargetQuery = capitalizeType(targetQuery);

const extension = gql`
extend type ${extendedType} {
${nameWrapper}: ${typeWrapper}
}
type ${typeWrapper} {
${targetQuery}: ${typeTargetQuery}
}
`;

const resolvers = {
[extendedType]: {
[nameWrapper]: (parent, args, context, info) => {
let newParent = {};
for (e of argsNeeded) {
if (parent[e] === undefined)
throw `Error: ${e} is not into the parent object!`;
newParent[e] = parent[e];
const extension = gql`
extend type ${extendedType} {
${nameWrapper}: ${typeWrapper}
}
type ${typeWrapper} {
${targetQuery}: ${typeTargetQuery}
}
return newParent;
},
`;

const resolvers = {
[extendedType]: {
[nameWrapper]: (parent, args, context, info) => {
const newParent = {};
for (e of argsNeeded) {
if (parent[e] === undefined)
throw `Error: ${e} is not into the parent object!`;
newParent[e] = parent[e];
}
return newParent;
},
[typeWrapper]: {
[targetQuery]: (parent, args, context, info) => {
return targetType.resolve(parent, parent, context, info);
},
},
[typeWrapper]: {
[targetQuery]: (parent, args, context, info) => {
return targetType.resolve(parent, parent, context, info);
},
};
},
};

const extendedSchema = extendSchema(baseSchema, extension);
const newSchema = addResolversToSchema(extendedSchema, resolvers);
return newSchema;
}

const extendedSchema = extendSchema(baseSchema, extension);
const newSchema = addResolversToSchema(extendedSchema, resolvers);
return newSchema;
},
};
module.exports = { decorateBaseSchema };
120 changes: 120 additions & 0 deletions qlkube/src/decorateSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
const { gql } = require('apollo-server-core');
const { extendSchema } = require('graphql/utilities');
const { addResolversToSchema } = require('@graphql-tools/schema');
const { PubSub, withFilter } = require('apollo-server');
const { capitalizeType } = require('./utils.js');

const pubsub = new PubSub();

function decorateEnum(baseSchema, enumName, values) {
if (!baseSchema) throw 'Parameter baseSchema cannot be empty!';
if (!enumName) throw 'Parameter enumName cannot be empty!';
if (!values) throw 'Parameter values cannot be empty!';

if (baseSchema._typeMap[enumName] !== undefined)
throw 'Enum type is already present in the schema!';

let enumType = `enum ${enumName} {`;
values.forEach(val => {
enumType += `
${val}`;
});
enumType += `
}`;
const extension = gql`
${enumType}
`;

const newSchema = extendSchema(baseSchema, extension);
return newSchema;
}

function decorateSubscription(baseSchema, targetType, enumType) {
if (!baseSchema) throw 'Parameter baseSchema cannot be empty!';
if (!targetType) throw 'Parameter targetType cannot be empty!';
if (!enumType) throw 'Parameter enumType cannot be empty!';

if (baseSchema.getQueryType().getFields()[targetType] === undefined)
throw 'Target type not found into the schema';

const subscriptionField = `${targetType}Update`;
const label = targetType;

const subType =
baseSchema._typeMap.Subscription === undefined
? 'type Subscription'
: 'extend type Subscription';

const subscriptionType = capitalizeType(subscriptionField);
targetType = capitalizeType(targetType);

const extension = gql`
type ${subscriptionType} {
updateType: ${enumType}
payload: ${targetType}
}
${subType} {
${subscriptionField}(name: String, namespace: String!): ${subscriptionType}
}
`;

const resolvers = {
Subscription: {
[subscriptionField]: {
subscribe: withFilter(
() => pubsub.asyncIterator([label]),
(payload, variables, info, context) => {
return (
payload.apiObj.metadata.namespace === variables.namespace &&
(variables.name === undefined ||
payload.apiObj.metadata.name === variables.name)
);
}
),
resolve: async (payload, args, context, info) => {
return payload;
},
},
},
[subscriptionType]: {
updateType: (payload, args, context, info) => {
return payload.type;
},
payload: (payload, args, context, info) => {
return payload.apiObj;
},
},
};

const extendedSchema = extendSchema(baseSchema, extension);
const newSchema = addResolversToSchema(extendedSchema, resolvers);
newSchema._subscriptionType = newSchema._typeMap.Subscription;
return newSchema;
}

function setupSubscriptions(subscriptions, schema) {
if (!subscriptions) throw 'Parameter subscriptions cannot be empty!';
if (!schema) throw 'Parameter schema cannot be empty!';

let newSchema = decorateEnum(schema, 'UpdateType', [
'ADDED',
'MODIFIED',
'DELETED',
]);

subscriptions.forEach(e => {
newSchema = decorateSubscription(newSchema, e.type, 'UpdateType');
});

return newSchema;
}

function publishEvent(label, value) {
pubsub.publish(label, value);
}

module.exports = {
decorateSubscription,
setupSubscriptions,
publishEvent,
};
70 changes: 57 additions & 13 deletions qlkube/src/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
const fs = require('fs').promises;
const { createServer } = require('http');
const express = require('express');
const { ApolloServer } = require('apollo-server-express');
const compression = require('compression');
const { createSchema } = require('./schema');
const { kwatch } = require('./watch.js');
const { setupSubscriptions } = require('./decorateSubscription.js');
const { subscriptions } = require('./subscriptions.js');
const getOpenApiSpec = require('./oas');
const { printSchema } = require('graphql');
const logger = require('pino')({ useLevelLabels: true });
const dotenv = require('dotenv');

dotenv.config();

main().catch(e =>
logger.error({ error: e.stack }, 'failed to start qlkube server')
Expand All @@ -25,16 +32,37 @@ async function main() {
: '';

const oas = await getOpenApiSpec(kubeApiUrl, token);
const schema = await createSchema(oas, kubeApiUrl, token);
let schema = await createSchema(oas, kubeApiUrl, token);

try {
schema = setupSubscriptions(subscriptions, schema);
} catch (e) {
console.error(e);
process.exit(1);
}

const server = new ApolloServer({
schema,
context: ({ req }) => {
if (req.headers.authorization && req.headers.authorization.length > 0) {
const strs = req.headers.authorization.split(' ');
var user = {};
user.token = strs[1];
return user;
subscriptions: {
path: '/subscription',
onConnect: (connectionParams, webSocket, context) => {
console.log('Connected!');
},
onDisconnect: (webSocket, context) => {
console.log('Disconnected!');
},
},

context: ({ req, connection }) => {
if (connection) {
return {};
} else {
if (req.headers.authorization && req.headers.authorization.length > 0) {
const strs = req.headers.authorization.split(' ');
var user = {};
user.token = strs[1];
return user;
}
}
},
});
Expand All @@ -52,10 +80,26 @@ async function main() {
app,
path: '/',
});
app.listen({ port: 8080 }, () =>
logger.info(
{ url: `http://localhost:8080${server.graphqlPath}` },
'🚀 Server ready'
)
);
const httpServer = createServer(app);
server.installSubscriptionHandlers(httpServer);

const PORT = process.env.CROWNLABS_QLKUBE_PORT || 8080;

httpServer.listen({ port: PORT }, () => {
console.log(
`🚀 Server ready at http://localhost:${PORT}${server.graphqlPath}`
);
console.log(
`🚀 Subscriptions ready at ws://localhost:${PORT}${server.subscriptionsPath}`
);
});

try {
subscriptions.forEach(sub => {
kwatch(sub.resource, sub.type);
});
} catch (e) {
console.error(e);
process.exit(1);
}
}
4 changes: 2 additions & 2 deletions qlkube/src/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ exports.createSchema = async (oas, kubeApiUrl, token) => {
let baseSchema = await oasToGraphQlSchema(oas, kubeApiUrl, token);
let allSchema = decorateSchema(baseSchema);
try {
let schemaWithInstanceTemplate = decorateBaseSchema(
const schemaWithInstanceTemplate = decorateBaseSchema(
'itPolitoCrownlabsV1alpha2Template',
'TemplateCrownlabsPolitoItTemplateRef',
allSchema,
'templateWrapper',
['name', 'namespace']
);
let schemaWithInstanceTenant = decorateBaseSchema(
const schemaWithInstanceTenant = decorateBaseSchema(
'itPolitoCrownlabsV1alpha1Tenant',
'TenantCrownlabsPolitoItTenantRef',
schemaWithInstanceTemplate,
Expand Down
6 changes: 6 additions & 0 deletions qlkube/src/subscriptions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const subscriptions = [
{ resource: '/api/v1/pods', type: 'ioK8sApiCoreV1Pod' },
{ resource: '/api/v1/nodes', type: 'ioK8sApiCoreV1Node' },
];

module.exports = { subscriptions };
5 changes: 5 additions & 0 deletions qlkube/src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
function capitalizeType(name) {
return name[0].toUpperCase() + name.slice(1);
}

module.exports = { capitalizeType };
Loading

0 comments on commit 48465ed

Please sign in to comment.