Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit b0684d9

Browse files
Add timestamps to task log plugin structure (#183)
1 parent 6996bb8 commit b0684d9

File tree

7 files changed

+353
-61
lines changed

7 files changed

+353
-61
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/aws/aws-sdk-go-v2/config v1.0.0
1414
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
1515
github.com/coocood/freecache v1.1.1
16-
github.com/flyteorg/flyteidl v0.18.48
16+
github.com/flyteorg/flyteidl v0.19.2
1717
github.com/flyteorg/flytestdlib v0.3.13
1818
github.com/go-logr/zapr v0.4.0 // indirect
1919
github.com/go-test/deep v1.0.7

go.sum

+227
Large diffs are not rendered by default.

go/tasks/logs/logging_utils.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package logs
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
89

@@ -44,11 +45,13 @@ func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, na
4445

4546
logs, err := logPlugin.GetTaskLogs(
4647
tasklog.Input{
47-
PodName: pod.Name,
48-
Namespace: pod.Namespace,
49-
ContainerName: pod.Spec.Containers[index].Name,
50-
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
51-
LogName: nameSuffix,
48+
PodName: pod.Name,
49+
Namespace: pod.Namespace,
50+
ContainerName: pod.Spec.Containers[index].Name,
51+
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
52+
LogName: nameSuffix,
53+
PodUnixStartTime: pod.CreationTimestamp.Unix(),
54+
PodUnixFinishTime: time.Now().Unix(),
5255
},
5356
)
5457

go/tasks/pluginmachinery/tasklog/plugin.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
55
// Input contains all available information about task's execution that a log plugin can use to construct task's
66
// log links.
77
type Input struct {
8-
HostName string `json:"hostname"`
9-
PodName string `json:"podName"`
10-
Namespace string `json:"namespace"`
11-
ContainerName string `json:"containerName"`
12-
ContainerID string `json:"containerId"`
13-
LogName string `json:"logName"`
8+
HostName string `json:"hostname"`
9+
PodName string `json:"podName"`
10+
Namespace string `json:"namespace"`
11+
ContainerName string `json:"containerName"`
12+
ContainerID string `json:"containerId"`
13+
LogName string `json:"logName"`
14+
PodUnixStartTime int64 `json:"podUnixStartTime"`
15+
PodUnixFinishTime int64 `json:"podUnixFinishTime"`
1416
}
1517

1618
// Output contains all task logs a plugin generates for a given Input.

go/tasks/pluginmachinery/tasklog/template.go

+35-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tasklog
33
import (
44
"fmt"
55
"regexp"
6+
"strconv"
67
"strings"
78

89
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
@@ -15,6 +16,8 @@ import (
1516
// {{ .containerId }}: The container id docker/crio generated at run time,
1617
// {{ .logName }}: A deployment specific name where to expect the logs to be.
1718
// {{ .hostname }}: The hostname where the pod is running and where logs reside.
19+
// {{ .podUnixStartTime }}: The pod creation time (in unix seconds, not millis)
20+
// {{ .podUnixFinishTime }}: Don't have a good mechanism for this yet, but approximating with time.Now for now
1821
type TemplateLogPlugin struct {
1922
templateUris []string
2023
messageFormat core.TaskLog_MessageFormat
@@ -26,22 +29,26 @@ type regexValPair struct {
2629
}
2730

2831
type templateRegexes struct {
29-
PodName *regexp.Regexp
30-
Namespace *regexp.Regexp
31-
ContainerName *regexp.Regexp
32-
ContainerID *regexp.Regexp
33-
LogName *regexp.Regexp
34-
Hostname *regexp.Regexp
32+
PodName *regexp.Regexp
33+
Namespace *regexp.Regexp
34+
ContainerName *regexp.Regexp
35+
ContainerID *regexp.Regexp
36+
LogName *regexp.Regexp
37+
Hostname *regexp.Regexp
38+
PodUnixStartTime *regexp.Regexp
39+
PodUnixFinishTime *regexp.Regexp
3540
}
3641

3742
func mustInitTemplateRegexes() templateRegexes {
3843
return templateRegexes{
39-
PodName: mustCreateRegex("podName"),
40-
Namespace: mustCreateRegex("namespace"),
41-
ContainerName: mustCreateRegex("containerName"),
42-
ContainerID: mustCreateRegex("containerID"),
43-
LogName: mustCreateRegex("logName"),
44-
Hostname: mustCreateRegex("hostname"),
44+
PodName: mustCreateRegex("podName"),
45+
Namespace: mustCreateRegex("namespace"),
46+
ContainerName: mustCreateRegex("containerName"),
47+
ContainerID: mustCreateRegex("containerID"),
48+
LogName: mustCreateRegex("logName"),
49+
Hostname: mustCreateRegex("hostname"),
50+
PodUnixStartTime: mustCreateRegex("podUnixStartTime"),
51+
PodUnixFinishTime: mustCreateRegex("podUnixFinishTime"),
4552
}
4653
}
4754

@@ -59,13 +66,15 @@ func replaceAll(template string, values []regexValPair) string {
5966
return template
6067
}
6168

62-
func (s TemplateLogPlugin) GetTaskLog(podName, namespace, containerName, containerID, logName string) (core.TaskLog, error) {
69+
func (s TemplateLogPlugin) GetTaskLog(podName, namespace, containerName, containerID, logName string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) {
6370
o, err := s.GetTaskLogs(Input{
64-
LogName: logName,
65-
Namespace: namespace,
66-
PodName: podName,
67-
ContainerName: containerName,
68-
ContainerID: containerID,
71+
LogName: logName,
72+
Namespace: namespace,
73+
PodName: podName,
74+
ContainerName: containerName,
75+
ContainerID: containerID,
76+
PodUnixStartTime: podUnixStartTime,
77+
PodUnixFinishTime: podUnixFinishTime,
6978
})
7079

7180
if err != nil || len(o.TaskLogs) == 0 {
@@ -114,6 +123,14 @@ func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
114123
regex: regexes.Hostname,
115124
val: input.HostName,
116125
},
126+
{
127+
regex: regexes.PodUnixStartTime,
128+
val: strconv.FormatInt(input.PodUnixStartTime, 10),
129+
},
130+
{
131+
regex: regexes.PodUnixFinishTime,
132+
val: strconv.FormatInt(input.PodUnixFinishTime, 10),
133+
},
117134
},
118135
),
119136
Name: input.LogName,

go/tasks/pluginmachinery/tasklog/template_test.go

+70-28
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ func TestTemplateLog(t *testing.T) {
1717
"flyteexamples-production",
1818
"spark-kubernetes-driver",
1919
"cri-o://abc",
20-
"main_logs")
20+
"main_logs",
21+
1426349294,
22+
1623782877,
23+
)
2124
assert.NoError(t, err)
2225
assert.Equal(t, tl.GetName(), "main_logs")
2326
assert.Equal(t, tl.GetMessageFormat(), core.TaskLog_JSON)
@@ -37,11 +40,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
3740
messageFormat core.TaskLog_MessageFormat
3841
}
3942
type args struct {
40-
podName string
41-
namespace string
42-
containerName string
43-
containerID string
44-
logName string
43+
podName string
44+
namespace string
45+
containerName string
46+
containerID string
47+
logName string
48+
podUnixStartTime int64
49+
podUnixFinishTime int64
4550
}
4651
tests := []struct {
4752
name string
@@ -57,11 +62,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
5762
messageFormat: core.TaskLog_JSON,
5863
},
5964
args{
60-
podName: "f-uuid-driver",
61-
namespace: "flyteexamples-production",
62-
containerName: "spark-kubernetes-driver",
63-
containerID: "cri-o://abc",
64-
logName: "main_logs",
65+
podName: "f-uuid-driver",
66+
namespace: "flyteexamples-production",
67+
containerName: "spark-kubernetes-driver",
68+
containerID: "cri-o://abc",
69+
logName: "main_logs",
70+
podUnixStartTime: 123,
71+
podUnixFinishTime: 12345,
6572
},
6673
core.TaskLog{
6774
Uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_flyteexamples-production_spark-kubernetes-driver-abc.log",
@@ -77,11 +84,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
7784
messageFormat: core.TaskLog_JSON,
7885
},
7986
args{
80-
podName: "podName",
81-
namespace: "flyteexamples-production",
82-
containerName: "spark-kubernetes-driver",
83-
containerID: "cri-o://abc",
84-
logName: "main_logs",
87+
podName: "podName",
88+
namespace: "flyteexamples-production",
89+
containerName: "spark-kubernetes-driver",
90+
containerID: "cri-o://abc",
91+
logName: "main_logs",
92+
podUnixStartTime: 123,
93+
podUnixFinishTime: 12345,
8594
},
8695
core.TaskLog{
8796
Uri: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3DpodName",
@@ -97,11 +106,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
97106
messageFormat: core.TaskLog_JSON,
98107
},
99108
args{
100-
podName: "flyteexamples-development-task-name",
101-
namespace: "flyteexamples-development",
102-
containerName: "ignore",
103-
containerID: "ignore",
104-
logName: "main_logs",
109+
podName: "flyteexamples-development-task-name",
110+
namespace: "flyteexamples-development",
111+
containerName: "ignore",
112+
containerID: "ignore",
113+
logName: "main_logs",
114+
podUnixStartTime: 123,
115+
podUnixFinishTime: 12345,
105116
},
106117
core.TaskLog{
107118
Uri: "https://dashboard.k8s.net/#!/log/flyteexamples-development/flyteexamples-development-task-name/pod?namespace=flyteexamples-development",
@@ -118,7 +129,7 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
118129
messageFormat: tt.fields.messageFormat,
119130
}
120131

121-
got, err := s.GetTaskLog(tt.args.podName, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName)
132+
got, err := s.GetTaskLog(tt.args.podName, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName, tt.args.podUnixStartTime, tt.args.podUnixFinishTime)
122133
if (err != nil) != tt.wantErr {
123134
t.Errorf("GetTaskLog() error = %v, wantErr %v", err, tt.wantErr)
124135
return
@@ -154,12 +165,14 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) {
154165
},
155166
args{
156167
input: Input{
157-
HostName: "my-host",
158-
PodName: "my-pod",
159-
Namespace: "my-namespace",
160-
ContainerName: "my-container",
161-
ContainerID: "ignore",
162-
LogName: "main_logs",
168+
HostName: "my-host",
169+
PodName: "my-pod",
170+
Namespace: "my-namespace",
171+
ContainerName: "my-container",
172+
ContainerID: "ignore",
173+
LogName: "main_logs",
174+
PodUnixStartTime: 123,
175+
PodUnixFinishTime: 12345,
163176
},
164177
},
165178
Output{
@@ -173,6 +186,35 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) {
173186
},
174187
false,
175188
},
189+
{
190+
"ddog",
191+
fields{
192+
templateURI: "https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}",
193+
messageFormat: core.TaskLog_JSON,
194+
},
195+
args{
196+
input: Input{
197+
HostName: "my-host",
198+
PodName: "my-pod",
199+
Namespace: "my-namespace",
200+
ContainerName: "my-container",
201+
ContainerID: "ignore",
202+
LogName: "main_logs",
203+
PodUnixStartTime: 123,
204+
PodUnixFinishTime: 12345,
205+
},
206+
},
207+
Output{
208+
TaskLogs: []*core.TaskLog{
209+
{
210+
Uri: "https://app.datadoghq.com/logs?event&from_ts=123&live=true&query=pod_name%3Amy-pod&to_ts=12345",
211+
MessageFormat: core.TaskLog_JSON,
212+
Name: "main_logs",
213+
},
214+
},
215+
},
216+
false,
217+
},
176218
}
177219
for _, tt := range tests {
178220
t.Run(tt.name, func(t *testing.T) {

go/tasks/plugins/array/k8s/monitor.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,10 @@ func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8s
222222

223223
if logPlugin != nil {
224224
o, err := logPlugin.GetTaskLogs(tasklog.Input{
225-
PodName: pod.Name,
226-
Namespace: pod.Namespace,
227-
LogName: fmt.Sprintf(" #%d-%d", index, retryAttempt),
225+
PodName: pod.Name,
226+
Namespace: pod.Namespace,
227+
LogName: fmt.Sprintf(" #%d-%d", index, retryAttempt),
228+
PodUnixStartTime: pod.CreationTimestamp.Unix(),
228229
})
229230

230231
if err != nil {

0 commit comments

Comments
 (0)