Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define simple models for job messages. #19688

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8919,6 +8919,22 @@ export interface components {
*/
src: components["schemas"]["DataItemSourceType"];
};
/** ExitCodeJobMessage */
ExitCodeJobMessage: {
/** Code Desc */
code_desc: string | null;
/** Desc */
desc: string | null;
/** Error Level */
error_level: number;
/** Exit Code */
exit_code: number;
/**
* Type
* @constant
*/
type: "exit_code";
};
/** ExportHistoryArchivePayload */
ExportHistoryArchivePayload: {
/**
Expand Down Expand Up @@ -14226,6 +14242,20 @@ export interface components {
*/
source: components["schemas"]["DatasetSourceType"];
};
/** MaxDiscoveredFilesJobMessage */
MaxDiscoveredFilesJobMessage: {
/** Code Desc */
code_desc: string | null;
/** Desc */
desc: string | null;
/** Error Level */
error_level: number;
/**
* Type
* @constant
*/
type: "max_discovered_files";
};
/** MessageExceptionModel */
MessageExceptionModel: {
/** Err Code */
Expand Down Expand Up @@ -15539,6 +15569,24 @@ export interface components {
/** Workflow */
workflow: string;
};
/** RegexJobMessage */
RegexJobMessage: {
/** Code Desc */
code_desc: string | null;
/** Desc */
desc: string | null;
/** Error Level */
error_level: number;
/** Match */
match: string | null;
/** Stream */
stream: string | null;
/**
* Type
* @constant
*/
type: "regex";
};
/** ReloadFeedback */
ReloadFeedback: {
/** Failed */
Expand Down Expand Up @@ -16220,7 +16268,13 @@ export interface components {
* Job Messages
* @description List with additional information and possible reasons for a failed job.
*/
job_messages?: unknown[] | null;
job_messages?:
| (
| components["schemas"]["ExitCodeJobMessage"]
| components["schemas"]["RegexJobMessage"]
| components["schemas"]["MaxDiscoveredFilesJobMessage"]
)[]
| null;
/**
* Job Metrics
* @description Collections of metrics provided by `JobInstrumenter` plugins on a particular job. Only administrators can see these metrics.
Expand Down
21 changes: 20 additions & 1 deletion client/src/components/DatasetInformation/DatasetError.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createPinia } from "pinia";
import { getLocalVue } from "tests/jest/helpers";

import { HttpResponse, useServerMock } from "@/api/client/__mocks__";
import { type components } from "@/api/schema";
import { useUserStore } from "@/stores/userStore";

import DatasetError from "./DatasetError.vue";
Expand All @@ -15,8 +16,26 @@ const DATASET_ID = "dataset_id";

const { server, http } = useServerMock();

type RegexJobMessage = components["schemas"]["RegexJobMessage"];

async function montDatasetError(has_duplicate_inputs = true, has_empty_inputs = true, user_email = "") {
const pinia = createPinia();
const error1: RegexJobMessage = {
desc: "message_1",
code_desc: null,
stream: null,
match: null,
type: "regex",
error_level: 1,
};
const error2: RegexJobMessage = {
desc: "message_2",
code_desc: null,
stream: null,
match: null,
type: "regex",
error_level: 1,
};

server.use(
http.get("/api/datasets/{dataset_id}", ({ response }) => {
Expand All @@ -35,7 +54,7 @@ async function montDatasetError(has_duplicate_inputs = true, has_empty_inputs =
tool_id: "tool_id",
tool_stderr: "tool_stderr",
job_stderr: "job_stderr",
job_messages: [{ desc: "message_1" }, { desc: "message_2" }],
job_messages: [error1, error2],
user_email,
create_time: "2021-01-01T00:00:00",
update_time: "2021-01-01T00:00:00",
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ def fail(message=job.info, exception=None):
final_job_state = job.states.ERROR
job.job_messages = [
{
"type": "internal",
"type": "max_discovered_files",
"desc": str(e),
"error_level": StdioErrorLevel.FATAL,
}
Expand Down
5 changes: 2 additions & 3 deletions lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from functools import partial
from pathlib import Path
from typing import (
Any,
Dict,
List,
Optional,
)
Expand Down Expand Up @@ -63,6 +61,7 @@
ObjectStore,
)
from galaxy.tool_util.output_checker import (
AnyJobMessage,
check_output,
DETECTED_JOB_STATE,
)
Expand Down Expand Up @@ -224,7 +223,7 @@ def set_meta(new_dataset_instance, file_dict):

export_store = None
final_job_state = Job.states.OK
job_messages: List[Dict[str, Any]] = []
job_messages: List[AnyJobMessage] = []
if extended_metadata_collection:
tool_dict = metadata_params["tool"]
stdio_exit_code_dicts, stdio_regex_dicts = tool_dict["stdio_exit_codes"], tool_dict["stdio_regexes"]
Expand Down
17 changes: 13 additions & 4 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
from galaxy.security import get_permitted_actions
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.security.validate_user_input import validate_password_str
from galaxy.tool_util.output_checker import AnyJobMessage
from galaxy.util import (
directory_hash_id,
enum_values,
Expand Down Expand Up @@ -562,6 +563,7 @@ def cached_id(galaxy_model_object):


class JobLike:
job_messages: Mapped[Optional[List[AnyJobMessage]]]
MAX_NUMERIC = 10 ** (JOB_METRIC_PRECISION - JOB_METRIC_SCALE) - 1

def _init_metrics(self):
Expand Down Expand Up @@ -596,7 +598,14 @@ def metrics(self):
# TODO: Make iterable, concatenate with chain
return self.text_metrics + self.numeric_metrics

def set_streams(self, tool_stdout, tool_stderr, job_stdout=None, job_stderr=None, job_messages=None):
def set_streams(
self,
tool_stdout,
tool_stderr,
job_stdout=None,
job_stderr=None,
job_messages: Optional[List[AnyJobMessage]] = None,
):
def shrink_and_unicodify(what, stream):
if stream and len(stream) > galaxy.util.DATABASE_MAX_STRING_SIZE:
log.info(
Expand All @@ -621,7 +630,7 @@ def shrink_and_unicodify(what, stream):
self.job_stderr = None

if job_messages is not None:
self.job_messages = job_messages
self.job_messages = cast(Optional[List[AnyJobMessage]], job_messages)

def log_str(self):
extra = ""
Expand Down Expand Up @@ -1485,7 +1494,7 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable):
copied_from_job_id: Mapped[Optional[int]]
command_line: Mapped[Optional[str]] = mapped_column(TEXT)
dependencies: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
job_messages: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
job_messages: Mapped[Optional[List[AnyJobMessage]]] = mapped_column(MutableJSONType)
param_filename: Mapped[Optional[str]] = mapped_column(String(1024))
runner_name: Mapped[Optional[str]] = mapped_column(String(255))
job_stdout: Mapped[Optional[str]] = mapped_column(TEXT)
Expand Down Expand Up @@ -2260,7 +2269,7 @@ class Task(Base, JobLike, RepresentById):
tool_stdout: Mapped[Optional[str]] = mapped_column(TEXT)
tool_stderr: Mapped[Optional[str]] = mapped_column(TEXT)
exit_code: Mapped[Optional[int]]
job_messages: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
job_messages: Mapped[Optional[List[AnyJobMessage]]] = mapped_column(MutableJSONType)
info: Mapped[Optional[str]] = mapped_column(TrimmedString(255))
traceback: Mapped[Optional[str]] = mapped_column(TEXT)
job_id: Mapped[int] = mapped_column(ForeignKey("job.id"), index=True)
Expand Down
52 changes: 0 additions & 52 deletions lib/galaxy/schema/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
DataItemSourceType,
EncodedDataItemSourceId,
EncodedJobParameterHistoryItem,
JobMetricCollection,
JobState,
JobSummary,
Model,
Expand Down Expand Up @@ -248,54 +247,3 @@ class JobDisplayParametersSummary(Model):
title="Outputs",
description="Dictionary mapping all the tool outputs (by name) with the corresponding dataset information in a nested format.",
)


class ShowFullJobResponse(EncodedJobDetails):
tool_stdout: Optional[str] = Field(
default=None,
title="Tool Standard Output",
description="The captured standard output of the tool executed by the job.",
)
tool_stderr: Optional[str] = Field(
default=None,
title="Tool Standard Error",
description="The captured standard error of the tool executed by the job.",
)
job_stdout: Optional[str] = Field(
default=None,
title="Job Standard Output",
description="The captured standard output of the job execution.",
)
job_stderr: Optional[str] = Field(
default=None,
title="Job Standard Error",
description="The captured standard error of the job execution.",
)
stdout: Optional[str] = Field( # Redundant? it seems to be (tool_stdout + "\n" + job_stdout)
default=None,
title="Standard Output",
description="Combined tool and job standard output streams.",
)
stderr: Optional[str] = Field( # Redundant? it seems to be (tool_stderr + "\n" + job_stderr)
default=None,
title="Standard Error",
description="Combined tool and job standard error streams.",
)
job_messages: Optional[List[Any]] = Field(
default=None,
title="Job Messages",
description="List with additional information and possible reasons for a failed job.",
)
dependencies: Optional[List[Any]] = Field(
default=None,
title="Job dependencies",
description="The dependencies of the job.",
)
job_metrics: Optional[JobMetricCollection] = Field(
default=None,
title="Job Metrics",
description=(
"Collections of metrics provided by `JobInstrumenter` plugins on a particular job. "
"Only administrators can see these metrics."
),
)
48 changes: 40 additions & 8 deletions lib/galaxy/tool_util/output_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
from enum import Enum
from logging import getLogger
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)

from typing_extensions import (
Literal,
TypedDict,
)

from galaxy.tool_util.parser.stdio import StdioErrorLevel
Expand All @@ -29,8 +34,35 @@ class DETECTED_JOB_STATE(str, Enum):
ERROR_PEEK_SIZE = 2000


JobMessageTypeLiteral = Literal["regex", "exit_code", "max_discovered_files"]


class JobMessage(TypedDict):
desc: Optional[str]
code_desc: Optional[str]
error_level: float # Literal[0, 1, 1.1, 2, 3, 4] - mypy doesn't like literal floats.


class RegexJobMessage(JobMessage):
type: Literal["regex"]
stream: Optional[str]
match: Optional[str]


class ExitCodeJobMessage(JobMessage):
type: Literal["exit_code"]
exit_code: int


class MaxDiscoveredFilesJobMessage(JobMessage):
type: Literal["max_discovered_files"]


AnyJobMessage = Union[ExitCodeJobMessage, RegexJobMessage, MaxDiscoveredFilesJobMessage]


def check_output_regex(
regex: "ToolStdioRegex", stream: str, stream_name: str, job_messages: List[Dict[str, Any]], max_error_level: int
regex: "ToolStdioRegex", stream: str, stream_name: str, job_messages: List[AnyJobMessage], max_error_level: int
) -> int:
"""
check a single regex against a stream
Expand All @@ -55,10 +87,10 @@ def check_output(
stdout: str,
stderr: str,
tool_exit_code: int,
) -> Tuple[str, str, str, List[Dict[str, Any]]]:
) -> Tuple[str, str, str, List[AnyJobMessage]]:
"""
Check the output of a tool - given the stdout, stderr, and the tool's
exit code, return DETECTED_JOB_STATE.OK if the tool exited succesfully or
exit code, return DETECTED_JOB_STATE.OK if the tool exited successfully or
error type otherwise. No exceptions should be thrown. If this code encounters
an exception, it returns OK so that the workflow can continue;
otherwise, a bug in this code could halt workflow progress.
Expand All @@ -77,7 +109,7 @@ def check_output(
# messages are added it the order of detection

# If job is failed, track why.
job_messages = []
job_messages: List[AnyJobMessage] = []

try:
# Check exit codes and match regular expressions against stdout and
Expand All @@ -103,7 +135,7 @@ def check_output(
if None is code_desc:
code_desc = ""
desc = f"{StdioErrorLevel.desc(stdio_exit_code.error_level)}: Exit code {tool_exit_code} ({code_desc})"
reason = {
reason: ExitCodeJobMessage = {
"type": "exit_code",
"desc": desc,
"exit_code": tool_exit_code,
Expand Down Expand Up @@ -168,7 +200,7 @@ def check_output(
return state, stdout, stderr, job_messages


def __regex_err_msg(match: re.Match, stream: str, regex: "ToolStdioRegex"):
def __regex_err_msg(match: re.Match, stream: str, regex: "ToolStdioRegex") -> RegexJobMessage:
"""
Return a message about the match on tool output using the given
ToolStdioRegex regex object. The regex_match is a MatchObject
Expand Down
Loading
Loading