Skip to content

Commit

Permalink
Merge pull request #124 from BU-Spark/addProgressBar
Browse files Browse the repository at this point in the history
Added Upload Tracking & History
  • Loading branch information
raheeqi authored Nov 3, 2024
2 parents d1992ef + 6761732 commit b030352
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 219 deletions.
224 changes: 142 additions & 82 deletions backend/graphql/resolvers/graphql_resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@ import { Collection } from "mongodb";
import { authMiddleware } from "../../authMiddleware.js";
import axios from "axios";
import { error } from "console";
const proxy_Url = "http://35.229.106.189:80/upload_csv";
const proxy_Url = process.env.REACT_APP_ML_PIP_URL || "";
import FormData from 'form-data';
import {createWriteStream} from 'fs';
import { GraphQLUpload } from "graphql-upload-minimal";
import { GraphQLScalarType, GraphQLError} from 'graphql';
import { FileUpload } from 'graphql-upload-minimal';
import path from 'path';
import { gql } from "@apollo/client";
import { Readable } from 'stream';
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();
const UPLOAD_PROGRESS = 'UPLOAD_PROGRESS';
const UPLOAD_STATUS_UPDATED = 'UPLOAD_STATUS_UPDATED';




Expand All @@ -31,119 +38,160 @@ interface UploadCSVArgs {
file: Promise<FileUpload>; // `file` should be a promise that resolves to a FileUpload type
userId: string;
}
// Helper function to simulate delayed progress
const simulateProgressUpdate = (userId, filename, progress) => {
pubsub.publish("UPLOAD_PROGRESS", {
uploadProgress: { userId, filename, progress, status: "Uploading" },
});
};

// const getLastTenUploads = async (userId, db) => {
// const uploadData = db.collection("uploads");
// return await uploadData
// .find({ userId })
// .sort({ timestamp: -1 })
// .limit(10)
// .map(upload => ({ ...upload, article_cnt: upload.article_cnt || 0 })) // Set default value to 0 if null
// .toArray();


// };

export const resolvers = {
Upload: GraphQLUpload,
Mutation: {
uploadCSV: async (_, { file, userId }: UploadCSVArgs, { db, req, res }) => {

uploadCSV: async (_, { file, userId }, { db }) => {
try {
// await authMiddleware(req, res, () => {});

const upload = await file; // Resolve the file promise to get Upload object

const upload = await file; // Resolve the file promise to get the Upload object
if (!upload) {
throw new Error("No file uploaded");
}

// Destructure the resolved `upload` object to get file properties
const { createReadStream, filename, mimetype } = upload;

// Check if `createReadStream` is defined
if (!createReadStream) {
throw new Error("Invalid file upload. `createReadStream` is not defined.");
}

console.log(`Uploading file: ${filename} (Type: ${mimetype}) for user: ${userId}`);

// Step 2: Create a read stream from the file
// Step 1: Buffer the file to calculate size
const stream = createReadStream();
const chunks = [];
let fileSize = 0;

for await (const chunk of stream) {
chunks.push(chunk);
fileSize += chunk.length;
}

// Step 3: Prepare FormData for sending to an external service (optional)
const fileBuffer = Buffer.concat(chunks); // File is now fully buffered
const fileSizeKB = (fileSize / 1024).toFixed(1);
console.log(`File size: ${fileSizeKB} KB`);

// Step 2: Create a new stream from the buffer for uploading
const uploadStream = Readable.from(fileBuffer);
const formData = new FormData();
formData.append("file", uploadStream, { filename });
formData.append("user_id", userId);

// Step 3: Upload with progress updates
const response = await axios.post(proxy_Url, formData, {
headers: {
...formData.getHeaders(),
"X-API-KEY": "beri-stronk-key",
},
onUploadProgress: function (progressEvent) {
const progress = Math.min(
Math.round((progressEvent.loaded / fileSize) * 100),
100
);

// Emit progress update for the client
simulateProgressUpdate(userId, filename, progress);

const map = JSON.stringify({ "1": ["variables.file"] });
formData.append("operations", JSON.stringify({
query: `mutation UploadCSV($file: Upload!, $userId: String!): UploadStatus! {
uploadCSV(file: $file, user_id: $userId) {
filename
status
}
}`,
variables: { file: null, userId},
}));
formData.append("map", map);
formData.append("1", stream, { filename, contentType: mimetype });

formData.append('file', stream, { filename });
formData.append('user_id', userId);

// Step 4: Send the file to an external API

console.log('URL being used in production:' , proxy_Url);
const response = await axios.post(
proxy_Url,
formData,
{
headers: {
...formData.getHeaders(),
"X-API-KEY": "beri-stronk-key"
},
}
);
console.log(`Upload Progress: ${progress}%`);
},
});

// Handle API response
if (response.status === 200) {
console.log('File uploaded successfully to external API.');

// Step 5: Save upload metadata to the database (if needed)
const uploadData = db.collection('uploads');
const result = await uploadData.insertOne({
userId,
filename,
timestamp: new Date(),
status: 'Success',
console.log("File uploaded successfully to external API.");

// Publish completion message
pubsub.publish("UPLOAD_PROGRESS", {
uploadProgress: {
userId,
filename,
progress: 100,
status: "Upload complete!",
},
});

const uploadData = db.collection("uploads");
// const result = await uploadData.insertOne({
// userId,
// filename,
// timestamp: new Date(),
// status: "Success",
// size: fileSizeKB,
// });

await db.collection("uploads").watch().on('change', (change) => {
const updatedUpload = change.fullDocument;
pubsub.publish(UPLOAD_STATUS_UPDATED, {
uploadStatusUpdated: updatedUpload,
});
});

return { filename: filename, status: 'Success' };
return { filename, status: "Success" };
} else {
throw new GraphQLError('Failed to upload CSV.');
throw new GraphQLError("Failed to upload CSV.");
}
} catch (error) {
console.error('Error uploading CSV:', error);
throw new GraphQLError('Error uploading CSV.');
console.error("Error uploading CSV:", error);
throw new GraphQLError("Error uploading CSV.");
}
},
addRssFeed: async (_, { url, userID }, { db, req, res }) => {
await authMiddleware(req, res, () => {});

const decodedToken = JSON.parse(req.headers.user as string);
if (!decodedToken) {
throw new Error('Unauthorized');
}

if (decodedToken.sub !== userID) {
throw new Error('Forbidden');
}

const rss_data = db.collection("rss_links");

// Create or update the RSS feed for the given userID
const filter = { userID: userID };
const update = {
$set: { url: url, userID: userID },
};
const options = {
upsert: true, // create a new document if no document matches the filter
returnDocument: "after", // return the modified document
};
},

addRssFeed: async (_, { url, userID }, { db, req, res }) => {
await authMiddleware(req, res, () => {});

const decodedToken = JSON.parse(req.headers.user as string);
if (!decodedToken) {
throw new Error('Unauthorized');
}

if (decodedToken.sub !== userID) {
throw new Error('Forbidden');
}

const rss_data = db.collection("rss_links");

// Create or update the RSS feed for the given userID
const filter = { userID: userID };
const update = {
$set: { url: url, userID: userID },
};
const options = {
upsert: true, // create a new document if no document matches the filter
returnDocument: "after", // return the modified document
};

const result = await rss_data.findOneAndUpdate(filter, update, options);

const result = await rss_data.findOneAndUpdate(filter, update, options);
return result.value;
},
},

Subscription: {
uploadProgress: {
subscribe: (_, { userId }) => pubsub.asyncIterator("UPLOAD_PROGRESS"),
},
uploadStatusUpdated: {
subscribe: () => pubsub.asyncIterator([UPLOAD_STATUS_UPDATED]),
},
},

return result.value;
},
},
Query: {
// RSS Resolver
getRssLinkByUserId: async (_, args, { db, req, res }) => {
Expand All @@ -162,6 +210,18 @@ export const resolvers = {
const rss_data = db.collection("rss_links");
const queryResult = await rss_data.find({ userID: args.userID }).toArray();
return queryResult;
},
lastTenUploads: async (_, { userId }, { db }) => {
const uploads = await db.collection("uploads")
.find({ userID: userId })
.sort({ timestamp: -1 })
.limit(10)
.toArray();

return uploads.map(upload => ({
...upload,
uploadID: upload.uploadID
}));
},
// CSV Upload Resolver
getUploadByUserId: async (_, args, { db, req, res }) => {
Expand Down
32 changes: 31 additions & 1 deletion backend/graphql/schemas/type_definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ export const typeDefs = gql`
userID: String!
}
type Subscription {
uploadProgress(userId: String!): UploadProgress
}
type UploadProgress {
userId: String!
filename: String!
progress: Int!
status: String
}
type Rss_data {
userID: String!
title: String!
Expand All @@ -29,6 +40,10 @@ export const typeDefs = gql`
message: String!
}
type Subscription {
uploadStatusUpdated: Uploads
}
type Mutation {
# Define a new mutation for uploading a CSV file
uploadCSV(file: Upload!, userId: String!): UploadStatus!
Expand All @@ -46,6 +61,21 @@ export const typeDefs = gql`
filename: String!
status: String!
}
type Query {
lastTenUploads(userId: String!): [UploadHistory!]!
}
type UploadHistory {
uploadID: String!
article_cnt: Int!
message: String!
status: String!
timestamp: String!
userID: String!
}
type Article {
Expand Down Expand Up @@ -135,4 +165,4 @@ export const typeDefs = gql`
}
`;
`;
16 changes: 4 additions & 12 deletions backend/graphql/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,15 @@ export interface DemographicsByTractsArgs {
tract: string;F
}

export interface IUploadedFile {
name: string;
size: number;
progress: number;
status: string;
file: FileUpload;
error?: string;
}

type UploadedFile = {
name: string;
size: number;
progress: number;
progress: number; // Track upload progress here
status: string;
error?: string; // fail to pass test
file: File; // store a reference to the File object
};
error?: string;
file: File;
};

// *** Data Types derived from the collections ***

Expand Down
Loading

0 comments on commit b030352

Please sign in to comment.