Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding method for getting the reindexing Status #8805

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 61 additions & 24 deletions components/ingest-service/storage/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"encoding/json"
"time"

"github.com/chef/automate/components/ingest-service/config"
Expand Down Expand Up @@ -49,37 +50,23 @@ func RunMigrations(dbConf *config.Storage) error {
// CRUD Operations

// Create a new reindex request
func (db *DB) InsertReindexRequest(requestID int, status string) error {
_, err := db.Exec(insertReindexRequest, requestID, status, time.Now(), time.Now())
func (db *DB) InsertReindexRequest(requestID int, status string, currentTime time.Time) error {
_, err := db.Exec(insertReindexRequest, requestID, status, currentTime, currentTime)
return err
}

// Update an existing reindex request
func (db *DB) UpdateReindexRequest(requestID int, status string) error {
_, err := db.Exec(updateReindexRequest, status, time.Now(), requestID)
func (db *DB) UpdateReindexRequest(requestID int, status string, currentTime time.Time) error {
_, err := db.Exec(updateReindexRequest, status, currentTime, requestID)
return err
}

// Fetch a reindex request by ID
func (db *DB) GetReindexRequest(requestID int) (*ReindexRequest, error) {
var request ReindexRequest
err := db.SelectOne(&request, getReindexRequest, requestID)
return &request, err
}

// Insert reindex request detailed entry
func (db *DB) InsertReindexRequestDetailed(detail ReindexRequestDetailed) error {
_, err := db.Exec(insertReindexRequestDetailed, detail.RequestID, detail.Index, detail.FromVersion, detail.ToVersion, detail.Stage, detail.OsTaskID, detail.Heartbeat, detail.HavingAlias, detail.AliasList, time.Now(), time.Now())
func (db *DB) InsertReindexRequestDetailed(detail ReindexRequestDetailed, currentTime time.Time) error {
_, err := db.Exec(insertReindexRequestDetailed, detail.RequestID, detail.Index, detail.FromVersion, detail.ToVersion, detail.Stage, detail.OsTaskID, detail.Heartbeat, detail.HavingAlias, detail.AliasList, currentTime, currentTime)
return err
}

// Fetch reindex request details
func (db *DB) GetReindexRequestDetails(requestID int) ([]*ReindexRequestDetailed, error) {
var details []*ReindexRequestDetailed
_, err := db.Select(&details, getReindexRequestDetails, requestID)
return details, err
}

// Delete a reindex request
func (db *DB) DeleteReindexRequest(requestID int) error {
_, err := db.Exec(deleteReindexRequest, requestID)
Expand All @@ -92,6 +79,49 @@ func (db *DB) DeleteReindexRequestDetail(id int) error {
return err
}

// Get reindex request status
func (db *DB) GetReindexStatus(requestID int) ([]*ReindexRequest, []*ReindexRequestDetailed, string, error) {
// Fetch the latest reindex request
var request []*ReindexRequest
_, err := db.Select(&request, getLatestReindexRequest, requestID)
if err != nil {
return nil, nil, "", errors.Wrap(err, "error fetching reindex request status from db")
}

// Handle case where no records are found
if len(request) == 0 {
return nil, nil, "", errors.New("no reindex request found for the given requestID")
}

// Fetch the latest reindex request details for each index
var details []*ReindexRequestDetailed
_, err = db.Select(&details, getLatestReindexRequestDetails, requestID)
if err != nil {
return nil, nil, "", errors.Wrap(err, "error fetching reindex request details from db")
}

// Prepare the status response
status := map[string]interface{}{
"overall_status": request[0].Status, // Using the latest reindex request status
"indexes": []map[string]string{},
}

for _, detail := range details {
status["indexes"] = append(status["indexes"].([]map[string]string), map[string]string{
"index": detail.Index,
"stage": detail.Stage,
})
}

// Marshal the status to JSON
statusJSON, err := json.Marshal(status)
if err != nil {
return nil, nil, "", errors.Wrap(err, "error marshalling reindex status to JSON")
}

return request, details, string(statusJSON), nil
}

// SQL Queries
const insertReindexRequest = `
INSERT INTO reindex_requests(request_id, status, created_at, last_updated)
Expand All @@ -100,15 +130,22 @@ VALUES ($1, $2, $3, $4);`
const updateReindexRequest = `
UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE request_id = $3;`

const getReindexRequest = `
SELECT request_id, status, created_at, last_updated FROM reindex_requests WHERE request_id = $1;`
const getLatestReindexRequest = `
SELECT request_id, status, created_at, last_updated
FROM reindex_requests
WHERE request_id = $1
ORDER BY last_updated DESC
LIMIT 1;`

const insertReindexRequestDetailed = `
INSERT INTO reindex_request_detailed(request_id, index, from_version, to_version, stage, os_task_id, heartbeat, having_alias, alias_list, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`

const getReindexRequestDetails = `
SELECT id, request_id, index, from_version, to_version, stage, os_task_id, heartbeat, having_alias, alias_list, created_at, updated_at FROM reindex_request_detailed WHERE request_id = $1;`
const getLatestReindexRequestDetails = `
SELECT DISTINCT ON (index) id, request_id, index, from_version, to_version, stage, os_task_id, heartbeat, having_alias, alias_list, created_at, updated_at
FROM reindex_request_detailed
WHERE request_id = $1
ORDER BY index, updated_at DESC;`

const deleteReindexRequest = `
DELETE FROM reindex_requests WHERE request_id = $1;`
Expand Down
Loading
Loading