From dad574a90d41995f73bda908e6537eb6a8f1cbcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Pr=C5=AF=C5=A1a?= <87543374+Patai5@users.noreply.github.com> Date: Sun, 17 Nov 2024 13:28:34 +0100 Subject: [PATCH] fix: split GPT request to a separate route (#83) * fix: split GPT request to a separate route * feat: add user data to requests --- code/src/crawler.ts | 27 +++++++--- code/src/hooks/initial-cookies.ts | 5 ++ code/src/routes/crawl-route.ts | 81 +++++----------------------- code/src/routes/gpt-route.ts | 88 +++++++++++++++++++++++++++++++ code/src/routes/router.ts | 15 ++++++ code/src/types/user-data.ts | 16 ++++++ 6 files changed, 157 insertions(+), 75 deletions(-) create mode 100644 code/src/routes/gpt-route.ts create mode 100644 code/src/routes/router.ts create mode 100644 code/src/types/user-data.ts diff --git a/code/src/crawler.ts b/code/src/crawler.ts index c9bd37a..ed824c8 100644 --- a/code/src/crawler.ts +++ b/code/src/crawler.ts @@ -1,7 +1,14 @@ -import { Dataset, NonRetryableError, PlaywrightCrawler, createRequestDebugInfo, log } from 'crawlee'; +import { + Dataset, + NonRetryableError, + PlaywrightCrawler, + PlaywrightCrawlingContext, + createRequestDebugInfo, + log, +} from 'crawlee'; import { initialCookiesHook } from './hooks/initial-cookies.js'; -import { crawlRoute } from './routes/crawl-route.js'; +import { LABELS, router } from './routes/router.js'; import { Config } from './types/config.js'; import { CrawlerState } from './types/crawler-state.js'; import { ERROR_TYPE } from './utils.js'; @@ -17,18 +24,22 @@ export const createCrawler = async (config: Config) => { }, }, /** - * The default value scale up too quickly for larger runs, this value is half that - * - Scaling down is still the default value, meaning the pool will scale down faster than it scales up + * The default values scale up too quickly for larger runs, this will make the scaling more gradual. + * - Scaling down is also set to be faster, as with playwright crawler, there are a lot of timeouts */ - autoscaledPoolOptions: { scaleUpStepRatio: 0.025 }, + autoscaledPoolOptions: { scaleUpStepRatio: 0.015, scaleDownStepRatio: 0.1 }, retryOnBlocked: true, requestHandlerTimeoutSecs: 3 * 60, proxyConfiguration, - maxRequestsPerCrawl: maxPagesPerCrawl, - requestHandler: crawlRoute, + requestHandler: router, preNavigationHooks: [ initialCookiesHook, - async () => { + async (context: PlaywrightCrawlingContext) => { + const { label } = context.request; + + const isCrawlRoute = label === LABELS.CRAWL; + if (!isCrawlRoute) return; + const state = await crawler.useState(); if (state.pagesOpened >= maxPagesPerCrawl) { const err = new NonRetryableError('Skipping this page'); diff --git a/code/src/hooks/initial-cookies.ts b/code/src/hooks/initial-cookies.ts index bffba64..3131505 100644 --- a/code/src/hooks/initial-cookies.ts +++ b/code/src/hooks/initial-cookies.ts @@ -1,6 +1,7 @@ import { Actor } from 'apify'; import { PlaywrightCrawlingContext } from 'crawlee'; +import { LABELS } from '../routes/router.js'; import { CrawlerState } from '../types/crawler-state.js'; /** @@ -9,6 +10,10 @@ import { CrawlerState } from '../types/crawler-state.js'; */ export const initialCookiesHook = async (context: PlaywrightCrawlingContext) => { const { page, crawler, request, session } = context; + const { label } = request.userData; + + const isCrawlRoute = label === LABELS.CRAWL; + if (!isCrawlRoute) return; const state = await crawler.useState(); const { initialCookies } = state.config; diff --git a/code/src/routes/crawl-route.ts b/code/src/routes/crawl-route.ts index 52ae5b5..a9c3224 100644 --- a/code/src/routes/crawl-route.ts +++ b/code/src/routes/crawl-route.ts @@ -1,25 +1,25 @@ -import { Actor } from 'apify'; -import { Dataset, KeyValueStore, NonRetryableError, PlaywrightCrawlingContext, log, sleep, utils } from 'crawlee'; +import { KeyValueStore, NonRetryableError, PlaywrightCrawlingContext, Request, log, sleep, utils } from 'crawlee'; import { Page } from 'playwright'; +import { LABELS } from './router.js'; import { validateInputCssSelectors } from '../configuration.js'; -import { ERROR_OCCURRED_MESSAGE, NonRetryableOpenaiAPIError, OpenaiAPIErrorToExitActor } from '../errors.js'; import { OpenAIModelHandler } from '../models/openai.js'; import { getNumberOfTextTokens, htmlToMarkdown, maybeShortsTextByTokenLength, shrinkHtml } from '../processors.js'; import { CrawlerState } from '../types/crawler-state.js'; import { PAGE_FORMAT } from '../types/input.js'; +import { CrawlRouteUserData, GptRequestUserData } from '../types/user-data.js'; import { ERROR_TYPE, doesUrlMatchGlobs } from '../utils.js'; /** * The main crawling route. Enqueues new URLs and processes the page by calling the GPT model. */ -export const crawlRoute = async (context: PlaywrightCrawlingContext) => { +export const crawlRoute = async (context: PlaywrightCrawlingContext) => { const { request, page, enqueueLinks, closeCookieModals, crawler } = context; const kvStore = await KeyValueStore.open(); const state = await crawler.useState(); - const { config, modelStats } = state; + const { config } = state; const { dynamicContentWaitSecs, excludeUrlGlobs, @@ -29,13 +29,10 @@ export const crawlRoute = async (context: PlaywrightCrawlingContext) => { maxCrawlingDepth, maxPagesPerCrawl, modelConfig, - modelSettings, pageFormat, removeElementsCssSelector, removeLinkUrls, saveSnapshots, - schema, - schemaDescription, skipGptGlobs, targetSelector, } = config; @@ -119,8 +116,6 @@ export const crawlRoute = async (context: PlaywrightCrawlingContext) => { const instructionTokenLength = getNumberOfTextTokens(instructions); - let answer = ''; - let jsonAnswer: null | object; const contentMaxTokens = model.modelConfig.maxTokens * 0.9 - instructionTokenLength; // 10% buffer for answer const pageContent = maybeShortsTextByTokenLength(originPageContent, contentMaxTokens); @@ -154,64 +149,16 @@ export const crawlRoute = async (context: PlaywrightCrawlingContext) => { } const remainingTokens = getNumberOfTextTokens(pageContent) + instructionTokenLength; - try { - const answerResult = await model.processInstructionsWithRetry({ - instructions, - content: pageContent, - schema, - schemaDescription, - modelSettings, - remainingTokens, - apifyClient: Actor.apifyClient, - }); - answer = answerResult.answer; - jsonAnswer = answerResult.jsonAnswer; - model.updateApiCallUsage(answerResult.usage, modelStats); - } catch (error) { - if (error instanceof OpenaiAPIErrorToExitActor) { - throw await Actor.fail(error.message); - } - if (error instanceof NonRetryableOpenaiAPIError) { - await Actor.setStatusMessage(ERROR_OCCURRED_MESSAGE, { level: 'WARNING' }); - return log.warning(error.message, { url }); - } - throw error; - } - - const answerLowerCase = answer?.toLocaleLowerCase() || ''; - if ( - answerLowerCase.includes('skip this page') - || answerLowerCase.includes('skip this url') - || answerLowerCase.includes('skip the page') - || answerLowerCase.includes('skip the url') - || answerLowerCase.includes('skip url') - || answerLowerCase.includes('skip page') - ) { - log.info(`Skipping page ${url} from output, the key word "skip this page" was found in answer.`, { answer }); - return; - } - - log.info(`Page ${url} processed.`, modelStats); - - // Store the results - await Dataset.pushData({ - url, - answer, - jsonAnswer, - htmlSnapshotUrl: snapshotKey - ? `https://api.apify.com/v2/key-value-stores/${kvStore.id}/records/${snapshotKey}.html` - : undefined, - screenshotUrl: snapshotKey - ? `https://api.apify.com/v2/key-value-stores/${kvStore.id}/records/${snapshotKey}.jpg` - : undefined, - sentContentUrl: sentContentKey - ? `https://api.apify.com/v2/key-value-stores/${kvStore.id}/records/${sentContentKey}` - : undefined, - '#debug': { - modelName: model.modelConfig.modelName, - modelStats, - }, + const userData = { ...request.userData, pageContent, remainingTokens, snapshotKey, pageUrl: url, sentContentKey }; + const gptRequest = new Request({ + userData, + uniqueKey: snapshotKey, + url: 'https://fakeUrl.com', + skipNavigation: true, + label: LABELS.GPT, }); + + await crawler.addRequests([gptRequest], { forefront: true }); }; /** diff --git a/code/src/routes/gpt-route.ts b/code/src/routes/gpt-route.ts new file mode 100644 index 0000000..d57cd6e --- /dev/null +++ b/code/src/routes/gpt-route.ts @@ -0,0 +1,88 @@ +import { Actor, Dataset } from 'apify'; +import { KeyValueStore, PlaywrightCrawlingContext, log } from 'crawlee'; + +import { ERROR_OCCURRED_MESSAGE, NonRetryableOpenaiAPIError, OpenaiAPIErrorToExitActor } from '../errors.js'; +import { OpenAIModelHandler } from '../models/openai.js'; +import { CrawlerState } from '../types/crawler-state.js'; +import { GptRequestUserData } from '../types/user-data.js'; + +export const gptRoute = async (context: PlaywrightCrawlingContext) => { + const { request, crawler } = context; + const { pageContent, remainingTokens, pageUrl, snapshotKey, sentContentKey } = request.userData; + + const kvStore = await KeyValueStore.open(); + + const state = await crawler.useState(); + const { config, modelStats } = state; + const { instructions, modelConfig, modelSettings, schema, schemaDescription } = config; + + const model = new OpenAIModelHandler(modelConfig); + + let answer = ''; + let jsonAnswer: null | object; + + log.info(`Calling GPT for page ${pageUrl}.`); + + try { + const answerResult = await model.processInstructionsWithRetry({ + instructions, + content: pageContent, + schema, + schemaDescription, + modelSettings, + remainingTokens, + apifyClient: Actor.apifyClient, + }); + answer = answerResult.answer; + jsonAnswer = answerResult.jsonAnswer; + model.updateApiCallUsage(answerResult.usage, modelStats); + } catch (error) { + if (error instanceof OpenaiAPIErrorToExitActor) { + throw await Actor.fail(error.message); + } + if (error instanceof NonRetryableOpenaiAPIError) { + await Actor.setStatusMessage(ERROR_OCCURRED_MESSAGE, { level: 'WARNING' }); + return log.warning(error.message, { url: pageUrl }); + } + throw error; + } + + const SKIP_PAGE_KEYWORDS = [ + 'skip this page', + 'skip this url', + 'skip the page', + 'skip the url', + 'skip url', + 'skip page', + ]; + + const answerLowerCase = answer?.toLocaleLowerCase() || ''; + if (SKIP_PAGE_KEYWORDS.includes(answerLowerCase)) { + log.info(`Skipping page ${pageUrl} from output, the key word "skip this page" was found in answer.`, { + answer, + }); + return; + } + + log.info(`Page ${pageUrl} processed.`, modelStats); + + // Store the results + await Dataset.pushData({ + url: pageUrl, + answer, + jsonAnswer, + htmlSnapshotUrl: snapshotKey + ? `https://api.apify.com/v2/key-value-stores/${kvStore.id}/records/${snapshotKey}.html` + : undefined, + screenshotUrl: snapshotKey + ? `https://api.apify.com/v2/key-value-stores/${kvStore.id}/records/${snapshotKey}.jpg` + : undefined, + sentContentUrl: sentContentKey + ? `https://api.apify.com/v2/key-value-stores/${kvStore.id}/records/${sentContentKey}` + : undefined, + '#debug': { + modelName: model.modelConfig.modelName, + modelStats, + }, + }); +}; diff --git a/code/src/routes/router.ts b/code/src/routes/router.ts new file mode 100644 index 0000000..d8161cb --- /dev/null +++ b/code/src/routes/router.ts @@ -0,0 +1,15 @@ +import { createPlaywrightRouter } from 'crawlee'; + +import { crawlRoute } from './crawl-route.js'; +import { gptRoute } from './gpt-route.js'; + +export const LABELS = { + GPT: 'GPT', + CRAWL: 'CRAWL', +} as const; + +export const router = createPlaywrightRouter(); + +router.addDefaultHandler(crawlRoute); +router.addHandler(LABELS.CRAWL, crawlRoute); +router.addHandler(LABELS.GPT, gptRoute); diff --git a/code/src/types/user-data.ts b/code/src/types/user-data.ts new file mode 100644 index 0000000..bbba337 --- /dev/null +++ b/code/src/types/user-data.ts @@ -0,0 +1,16 @@ +export type UserData = { + startUrl: string; +}; + +export type CrawlRouteUserData = UserData & { + depth?: number; + wasOpenedKey: string; +}; + +export type GptRequestUserData = { + pageContent: string; + remainingTokens: number; + pageUrl: string; + snapshotKey?: string; + sentContentKey?: string; +};