diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala index a3b0f9b4..2b969946 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala @@ -151,7 +151,16 @@ object DashboardSyncModel extends AbsDashboardModel { // enrollment/not-started/started/in-progress/completion count, live and retired courses val liveRetiredCourseEnrolmentDF = allCourseProgramCompletionWithDetailsDF.where(expr("category='Course' AND courseStatus IN ('Live', 'Retired') AND userStatus=1")) + val liveRetiredCourseProgramEnrolmentDF = allCourseProgramCompletionWithDetailsDF.where(expr("category IN ('Course', 'Program') AND courseStatus IN ('Live', 'Retired') AND userStatus=1")) val currentDate = LocalDate.now() + // Calculate twenty four hours ago + val twentyFourHoursAgo = currentDate.minusDays(1) + // Convert to LocalDateTime by adding a time component (midnight) + val twentyFourHoursAgoLocalDateTime = twentyFourHoursAgo.atStartOfDay() + // Get the epoch time in milliseconds with IST offset + val twentyFourHoursAgoEpochMillis = twentyFourHoursAgoLocalDateTime.toEpochSecond(java.time.ZoneOffset.ofHoursMinutes(5, 30)) + println("yesterday epoch : "+twentyFourHoursAgoEpochMillis) + val liveRetiredCourseProgramCompletedYesterdayDF = allCourseProgramCompletionWithDetailsDF.where(expr(s"category IN ('Course', 'Program') AND courseStatus IN ('Live', 'Retired') AND userStatus=1 AND dbCompletionStatus=2 AND courseCompletedTimestamp >= ${twentyFourHoursAgoEpochMillis}")) // Calculate twelve months ago val twelveMonthsAgo = currentDate.minusMonths(12) // Convert to LocalDateTime by adding a time component (midnight) @@ -166,6 +175,8 @@ object DashboardSyncModel extends AbsDashboardModel { // in-progress + completed = started val liveRetiredCourseInProgressDF = liveRetiredCourseStartedDF.where(expr("dbCompletionStatus=1")) val liveRetiredCourseCompletedDF = liveRetiredCourseStartedDF.where(expr("dbCompletionStatus=2")) + // course program completed + val liveRetiredCourseProgramCompletedDF = liveRetiredCourseProgramEnrolmentDF.where(expr("dbCompletionStatus=2")) // do both count(*) and countDistinct(userID) aggregates at once val enrolmentCountDF = liveRetiredCourseEnrolmentDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) @@ -173,6 +184,8 @@ object DashboardSyncModel extends AbsDashboardModel { val startedCountDF = liveRetiredCourseStartedDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) val inProgressCountDF = liveRetiredCourseInProgressDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) val completedCountDF = liveRetiredCourseCompletedDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) + val landingPageCompletedCountDF = liveRetiredCourseProgramCompletedDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) + val landingPageCompletedYesterdayCountDF = liveRetiredCourseProgramCompletedYesterdayDF.agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) // unique user counts val enrolmentUniqueUserCount = enrolmentCountDF.select("uniqueUserCount").first().getLong(0) @@ -198,17 +211,23 @@ object DashboardSyncModel extends AbsDashboardModel { val startedCount = startedCountDF.select("count").first().getLong(0) val inProgressCount = inProgressCountDF.select("count").first().getLong(0) val completedCount = completedCountDF.select("count").first().getLong(0) + val landingPageCompletedCount = landingPageCompletedCountDF.select("count").first().getLong(0) + val landingPageCompletedYesterdayCount = landingPageCompletedYesterdayCountDF.select("count").first().getLong(0) Redis.update("dashboard_enrolment_count", enrolmentCount.toString) Redis.update("dashboard_not_started_count", notStartedCount.toString) Redis.update("dashboard_started_count", startedCount.toString) Redis.update("dashboard_in_progress_count", inProgressCount.toString) Redis.update("dashboard_completed_count", completedCount.toString) + Redis.update("lp_completed_count", landingPageCompletedCount.toString) + Redis.update("lp_completed_yesterday_count", landingPageCompletedYesterdayCount.toString) println(s"dashboard_enrolment_count = ${enrolmentCount}") println(s"dashboard_not_started_count = ${notStartedCount}") println(s"dashboard_started_count = ${startedCount}") println(s"dashboard_in_progress_count = ${inProgressCount}") println(s"dashboard_completed_count = ${completedCount}") + println(s"lp_completed_count = ${landingPageCompletedCount}") + println(s"lp_completed_yesterday_count = ${landingPageCompletedYesterdayCount}") // mdo-wise enrollment/not-started/started/in-progress/completion counts val liveRetiredCourseEnrolmentByMDODF = liveRetiredCourseEnrolmentDF.groupBy("userOrgID").agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala index f57fb2ae..afc78559 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala @@ -90,6 +90,9 @@ object DataUtil extends Serializable { StructField("orgID", StringType, nullable = true), StructField("activeCount", LongType, nullable = true) )) + val monthlyActiveUsersSchema: StructType = StructType(Seq( + StructField("DAUOutput", LongType, nullable = true) + )) val timeSpentSchema: StructType = StructType(Seq( StructField("orgID", StringType, nullable = true), StructField("timeSpent", FloatType, nullable = true) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala index b5a932fd..6fd0ab08 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala @@ -5,6 +5,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} import org.ekstep.analytics.dashboard.DashboardUtil._ import org.ekstep.analytics.dashboard.DataUtil._ +import org.ekstep.analytics.dashboard.telemetry.SummaryRedisSyncModel.averageMonthlyActiveUsersDataFrame import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig, Redis} import org.ekstep.analytics.framework._ @@ -39,6 +40,15 @@ object SummaryRedisSyncModel extends AbsDashboardModel { val activeUsersLast24HoursDF = activeUsersLast24HoursDataFrame() Redis.dispatchDataFrame[Long]("dashboard_active_users_last_24_hours_by_org", activeUsersLast24HoursDF, "orgID", "activeCount") + //Monthly active users + // SELECT ROUND(AVG(daily_count * 1.0), 1) as DAUOutput FROM + // (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" + // WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2) + val averageMonthlyActiveUsersDF = averageMonthlyActiveUsersDataFrame() + val averageMonthlyActiveUsersCount = averageMonthlyActiveUsersDF.groupBy().agg(expr("CASE WHEN COUNT(*) > 0 THEN CAST(AVG(DAUOutput) AS LONG) ELSE 0 END").alias("count")).first().getLong(0) + Redis.update("lp_monthly_active_users", averageMonthlyActiveUsersCount.toString) + println(s"lp_monthly_active_users = ${averageMonthlyActiveUsersCount}") + Redis.closeRedisConnect() @@ -77,4 +87,15 @@ object SummaryRedisSyncModel extends AbsDashboardModel { df } + def averageMonthlyActiveUsersDataFrame()(implicit spark: SparkSession, conf: DashboardConfig) : DataFrame = { + val query = """SELECT ROUND(AVG(daily_count * 1.0), 1) as DAUOutput FROM (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2)""" + var df = druidDFOption(query, conf.sparkDruidRouterHost).orNull + if (df == null) return emptySchemaDataFrame(Schema.monthlyActiveUsersSchema) + + df = df.withColumn("DAUOutput", expr("CAST(DAUOutput as LONG)")) // Important to cast as long otherwise a cast will fail later on + + show(df) + df + } + }