-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathecs.go
165 lines (134 loc) · 6.16 KB
/
ecs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package ecs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ecs"
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil/endpoints"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
)
const (
// TypeStr is type of detector.
TypeStr = "ecs"
)
var _ internal.Detector = (*Detector)(nil)
type Detector struct {
provider ecsutil.MetadataProvider
}
func NewDetector(params processor.CreateSettings, _ internal.DetectorConfig) (internal.Detector, error) {
provider, err := ecsutil.NewDetectedTaskMetadataProvider(params.TelemetrySettings)
if err != nil {
// Allow metadata provider to be created in incompatible environments and just have a noop Detect()
var errNTMED endpoints.ErrNoTaskMetadataEndpointDetected
if errors.As(err, &errNTMED) {
return &Detector{provider: nil}, nil
}
return nil, fmt.Errorf("unable to create task metadata provider: %w", err)
}
return &Detector{provider: provider}, nil
}
// Detect records metadata retrieved from the ECS Task Metadata Endpoint (TMDE) as resource attributes
// TODO(willarmiros): Replace all attribute fields and enums with values defined in "conventions" once they exist
func (d *Detector) Detect(context.Context) (resource pcommon.Resource, schemaURL string, err error) {
res := pcommon.NewResource()
// don't attempt to fetch metadata if there's no provider (incompatible env)
if d.provider == nil {
return res, "", nil
}
tmdeResp, err := d.provider.FetchTaskMetadata()
if err != nil || tmdeResp == nil {
return res, "", fmt.Errorf("unable to fetch task metadata: %w", err)
}
attr := res.Attributes()
attr.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS)
attr.PutStr(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAWSECS)
attr.PutStr(conventions.AttributeAWSECSTaskARN, tmdeResp.TaskARN)
attr.PutStr(conventions.AttributeAWSECSTaskFamily, tmdeResp.Family)
attr.PutStr(conventions.AttributeAWSECSTaskRevision, tmdeResp.Revision)
region, account := parseRegionAndAccount(tmdeResp.TaskARN)
if account != "" {
attr.PutStr(conventions.AttributeCloudAccountID, account)
}
if region != "" {
attr.PutStr(conventions.AttributeCloudRegion, region)
}
// TMDE returns the cluster short name or ARN, so we need to construct the ARN if necessary
attr.PutStr(conventions.AttributeAWSECSClusterARN, constructClusterArn(tmdeResp.Cluster, region, account))
// The Availability Zone is not available in all Fargate runtimes
if tmdeResp.AvailabilityZone != "" {
attr.PutStr(conventions.AttributeCloudAvailabilityZone, tmdeResp.AvailabilityZone)
}
// The launch type and log data attributes are only available in TMDE v4
switch lt := strings.ToLower(tmdeResp.LaunchType); lt {
case "ec2":
attr.PutStr(conventions.AttributeAWSECSLaunchtype, "ec2")
case "fargate":
attr.PutStr(conventions.AttributeAWSECSLaunchtype, "fargate")
}
selfMetaData, err := d.provider.FetchContainerMetadata()
if err != nil || selfMetaData == nil {
return res, "", err
}
addValidLogData(tmdeResp.Containers, selfMetaData, account, attr)
return res, conventions.SchemaURL, nil
}
func constructClusterArn(cluster, region, account string) string {
// If cluster is already an ARN, return it
if bytes.IndexByte([]byte(cluster), byte(':')) != -1 {
return cluster
}
return fmt.Sprintf("arn:aws:ecs:%s:%s:cluster/%s", region, account, cluster)
}
// Parses the AWS Account ID and AWS Region from a task ARN
// See: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-account-settings.html#ecs-resource-ids
func parseRegionAndAccount(taskARN string) (region string, account string) {
parts := strings.Split(taskARN, ":")
if len(parts) >= 5 {
return parts[3], parts[4]
}
return "", ""
}
// Filter out non-normal containers, our own container since we assume the collector is run as a sidecar,
// "init" containers which only run at startup then shutdown (as indicated by the "KnownStatus" attribute),
// containers not using AWS Logs, and those without log group metadata to get the final lists of valid log data
// See: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html#task-metadata-endpoint-v4-response
func addValidLogData(containers []ecsutil.ContainerMetadata, self *ecsutil.ContainerMetadata, account string, dest pcommon.Map) {
initialized := false
var logGroupNames pcommon.Slice
var logGroupArns pcommon.Slice
var logStreamNames pcommon.Slice
var logStreamArns pcommon.Slice
for _, container := range containers {
logData := container.LogOptions
if container.Type == "NORMAL" &&
container.KnownStatus == "RUNNING" &&
container.LogDriver == "awslogs" &&
self.DockerID != container.DockerID &&
logData != (ecsutil.LogOptions{}) {
if !initialized {
logGroupNames = dest.PutEmptySlice(conventions.AttributeAWSLogGroupNames)
logGroupArns = dest.PutEmptySlice(conventions.AttributeAWSLogGroupARNs)
logStreamNames = dest.PutEmptySlice(conventions.AttributeAWSLogStreamNames)
logStreamArns = dest.PutEmptySlice(conventions.AttributeAWSLogStreamARNs)
initialized = true
}
logGroupNames.AppendEmpty().SetStr(logData.LogGroup)
logGroupArns.AppendEmpty().SetStr(constructLogGroupArn(logData.Region, account, logData.LogGroup))
logStreamNames.AppendEmpty().SetStr(logData.Stream)
logStreamArns.AppendEmpty().SetStr(constructLogStreamArn(logData.Region, account, logData.LogGroup, logData.Stream))
}
}
}
func constructLogGroupArn(region, account, group string) string {
return fmt.Sprintf("arn:aws:logs:%s:%s:log-group:%s", region, account, group)
}
func constructLogStreamArn(region, account, group, stream string) string {
return fmt.Sprintf("%s:log-stream:%s", constructLogGroupArn(region, account, group), stream)
}