Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔧 fix: graphQL endpoint is not picking up data #105

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cli/commands/jobs_manuscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"time"
)

Expand Down Expand Up @@ -129,6 +130,36 @@ func displayManuscriptStates(manuscripts []pkg.Manuscript, dockers []pkg.Contain
}

displayJobStatus(i+1, &m, state)

if state == pkg.StateRunning {
// hasura metadata endpoint tracks tables
url := fmt.Sprintf("http://127.0.0.1:%d/v1/metadata", m.GraphQLPort)

payload := fmt.Sprintf(`{
"type": "bulk",
"source": "default",
"resource_version": 1,
"args": [{
"type": "postgres_track_tables",
"args": {
"allow_warnings": true,
"tables": [{
"table": {
"name": "%s",
"schema": "public"
},
"source": "default"
}]
}
}]
}`, m.Table)

resp, err := http.Post(url, "application/json", strings.NewReader(payload))
if err != nil {
return
}
defer resp.Body.Close()
}
}
}

Expand Down
85 changes: 14 additions & 71 deletions cli/pkg/manuscript_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"os/exec"
"regexp"
"strings"
"time"
)
Expand Down Expand Up @@ -70,7 +69,7 @@ func (sd *StateDetector) DetectState() (ManuscriptState, error) {
}

// Analyze Flink logs to determine the current state. If they're unreachable -> UNKNOWN
state, err := sd.analyzeFlinkLogs(jobManagerName)
state, err := sd.checkContainerStatus(jobManagerName)
if err != nil {
return StateUnknown, fmt.Errorf("failed to analyze logs: %w", err)
}
Expand All @@ -83,84 +82,28 @@ func (sd *StateDetector) DetectState() (ManuscriptState, error) {
return state, nil
}

// analyzeFlinkLogs scans the logs of a Flink container to determine the current state
func (sd *StateDetector) analyzeFlinkLogs(containerName string) (ManuscriptState, error) {
// Create a context that will timeout after 5 seconds and ensure cleanup with defer
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Get the last 500 lines of logs
logs, err := GetContainerLogs(ctx, containerName, 500)
func (sd *StateDetector) checkContainerStatus(containerName string) (ManuscriptState, error) {
dockers, err := RunDockerPs()
if err != nil {
return StateUnknown, fmt.Errorf("failed to get container logs: %w", err)
}

// Create variables to store the most recent relevant log entries
var lastRunningMatch string
var lastInitMatch string
var lastFailMatch string
var lastFinishMatch string

// Define regular expressions to match relevant log entries
patterns := map[string]*regexp.Regexp{
string(StateRunning): regexp.MustCompile(
`(Job .+\(.+\) (switched to RUNNING|switched from .+ to RUNNING)|` +
`Completed checkpoint \d+ for job)`),
string(StateFailed): regexp.MustCompile(
`(Job [a-f0-9-]+ \(.*\) switched to FAILED|` +
`Exception in thread|Error|FATAL|` +
`Task failure|JobManager failure)`),
string(StateInitializing): regexp.MustCompile(
`(Starting JobManager|` +
`Starting TaskManager|` +
`Created new job|` +
`Submitting job|` +
`Job [a-f0-9-]+ \(.*\) is being submitted)`),
string(StateRunning) + "_FINISHED": regexp.MustCompile(`Job [a-f0-9-]+ \(.*\) switched to FINISHED`),
}

// Scan logs from newest to oldest
for i := len(logs) - 1; i >= 0; i-- {
line := logs[i]
// Check if the log entry matches any of the relevant patterns
if patterns[string(StateRunning)].MatchString(line) && lastRunningMatch == "" {
lastRunningMatch = line
}
if patterns[string(StateFailed)].MatchString(line) && lastFailMatch == "" {
lastFailMatch = line
}
if patterns[string(StateInitializing)].MatchString(line) && lastInitMatch == "" {
lastInitMatch = line
}
if patterns[string(StateRunning)+"_FINISHED"].MatchString(line) && lastFinishMatch == "" {
lastFinishMatch = line
}
container := sd.findContainer(containerName)
if container == nil {
return StateUnknown, fmt.Errorf("container not found: %s", containerName)
}

// Return state based on most recent relevant log entry
if lastFinishMatch != "" {
// If we found a FINISHED state and there's no FAILED state after it -> RUNNING
if lastFailMatch == "" || strings.Index(lastFinishMatch, lastFailMatch) > 0 {
return StateRunning, nil
for _, docker := range dockers {
if docker.Name == containerName {
if strings.Contains(docker.Status, "Up") {
return StateRunning, nil
} else if strings.Contains(docker.Status, "Exited") {
return StateFailed, nil
}
}
}
if lastRunningMatch != "" {
// If we found a RUNNING state and there's no FAILED state after it -> RUNNING
if lastFailMatch == "" || strings.Index(lastRunningMatch, lastFailMatch) > 0 {
return StateRunning, nil
}
}
if lastFailMatch != "" {
// If we found a FAILED pattern match, and it survived other checks -> FAILED
return StateFailed, nil
}
if lastInitMatch != "" {
// If we found an INITIALIZING pattern match, and it survived other checks -> INITIALIZING
return StateInitializing, nil
}

// By default, if a manuscript survives all checks, return RUNNING state
return StateRunning, nil
return StateUnknown, nil
}

func GetContainerLogs(ctx context.Context, containerName string, lines int) ([]string, error) {
Expand Down