Skip to content

Commit 4d113c3

Browse files
committed
feat: add stream name to record
1 parent a377ded commit 4d113c3

15 files changed

+76
-25
lines changed

benchmarks/add_static_value_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func heavy_static_column_stages(number_of_stages int, number_of_records int) {
1818
output := make(chan Record)
1919
errors := make(chan error)
2020

21-
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
21+
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
2222
core.WithLogLevel(slog.LevelError))
2323

2424
// Create stages

benchmarks/filter_benchmark_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func heavy_filter_stages(number_of_stages int, number_of_records int) {
1818
output := make(chan Record)
1919
errors := make(chan error)
2020

21-
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
21+
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
2222
core.WithLogLevel(slog.LevelError))
2323

2424
// Create stages

benchmarks/rename_benchmark_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func heavy_rename_column_stages(number_of_records int) {
1717
output := make(chan Record)
1818
errors := make(chan error)
1919

20-
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
20+
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
2121
core.WithLogLevel(slog.LevelError))
2222

2323
// Create stages

benchmarks/schema_validation_benchmark_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func heavy_schema_validation_stages(number_of_records int) {
1717
output := make(chan Record)
1818
errors := make(chan error)
1919

20-
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
20+
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
2121
core.WithLogLevel(slog.LevelError))
2222

2323
go func() {

benchmarks/select_benchmark_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func heavy_select_stages(number_of_stages int, number_of_records int) {
1717
output := make(chan Record)
1818
errors := make(chan error)
1919

20-
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(),
20+
sdf := core.NewStreamDataFrame(input, output, errors, utils.HeavyRecordSchema(), "test-stream",
2121
core.WithLogLevel(slog.LevelError))
2222

2323
result_df := sdf.Select("field_1", "field_2", "field_3", "field_4",

pkg/core/config_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func TestStreamDataFrame_ConfigDefaultValues_DefaultValuesAssignedCorrectly(t *testing.T) {
13-
sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{})
13+
sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{}, "test-stream")
1414

1515
default_config := Config{
1616
LogLevel: slog.LevelInfo,
@@ -20,7 +20,7 @@ func TestStreamDataFrame_ConfigDefaultValues_DefaultValuesAssignedCorrectly(t *t
2020
}
2121

2222
func TestStreamDataFrame_ConfigWithLogLevel_LogLevelAssignedCorrectly(t *testing.T) {
23-
sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{},
23+
sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{}, "test-stream",
2424
WithLogLevel(slog.LevelError),
2525
)
2626

pkg/core/stream_dataframe.go

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type StreamDataFrame struct {
2020
SourceStream chan (types.Record)
2121
OutputStream chan (types.Record)
2222
ErrorStream chan (error)
23+
Name string
2324
Stages []Stage
2425
Schema types.Schema
2526
Configs *Config
@@ -31,6 +32,7 @@ func NewStreamDataFrame(
3132
outputStream chan (types.Record),
3233
errorStream chan error,
3334
schema types.Schema,
35+
streamName string,
3436
options ...Option,
3537
) StreamDataFrame {
3638
// Create config with default values
@@ -46,6 +48,7 @@ func NewStreamDataFrame(
4648
SourceStream: sourceStream,
4749
OutputStream: outputStream,
4850
ErrorStream: errorStream,
51+
Name: streamName,
4952
Stages: []Stage{},
5053
Schema: schema,
5154
Configs: &config,
@@ -159,6 +162,9 @@ func (sdf *StreamDataFrame) validateSchema() DataFrame {
159162
sdf.ErrorStream <- err
160163
}
161164

165+
// If schema was valid, add name of the stream record belongs to
166+
data.Metadata.Stream = sdf.Name
167+
162168
return []types.Record{data}, nil
163169
}
164170

pkg/core/stream_dataframe_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestSreamDataFrame_AddStage_FirstStage(t *testing.T) {
1616
schema := Schema{
1717
Columns: Fields{},
1818
}
19-
sdf := NewStreamDataFrame(input, output, errors, schema)
19+
sdf := NewStreamDataFrame(input, output, errors, schema, "test-stream")
2020
executor := func(ctx context.Context, data Record) ([]Record, error) {
2121
return nil, nil
2222
}
@@ -39,7 +39,7 @@ func TestSreamDataFrame_AddStage_ChainStages(t *testing.T) {
3939
schema := Schema{
4040
Columns: Fields{},
4141
}
42-
sdf := NewStreamDataFrame(input, output, errors, schema)
42+
sdf := NewStreamDataFrame(input, output, errors, schema, "test-stream")
4343
executor := func(ctx context.Context, data Record) ([]Record, error) {
4444
return nil, nil
4545
}
@@ -82,7 +82,7 @@ func TestSreamDataFrame_Execute_CancellingContextStopsExecution(t *testing.T) {
8282
schema := Schema{
8383
Columns: Fields{},
8484
}
85-
sdf := NewStreamDataFrame(input, output, errors, schema)
85+
sdf := NewStreamDataFrame(input, output, errors, schema, "test-stream")
8686

8787
ctx := context.Background()
8888
ctx, cancel := context.WithCancel(ctx)

pkg/functions/add_column_test.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestAddStaticColumn_AddIntegerField_AddColumnToSchemaAndRecords(t *testing.
9595
"last_name": StringType,
9696
},
9797
}
98-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
98+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
9999

100100
// Logic to test
101101
result_df := sdf.AddStaticColumn("age", Integer{Val: 10})
@@ -142,6 +142,9 @@ func TestAddStaticColumn_AddIntegerField_AddColumnToSchemaAndRecords(t *testing.
142142
"last_name": String{Val: "random_lastname"},
143143
"age": Integer{Val: 10},
144144
},
145+
Metadata: Metadata{
146+
Stream: "test-stream",
147+
},
145148
},
146149
{
147150
Key: "key2",
@@ -150,6 +153,9 @@ func TestAddStaticColumn_AddIntegerField_AddColumnToSchemaAndRecords(t *testing.
150153
"last_name": String{Val: "random_lastname"},
151154
"age": Integer{Val: 10},
152155
},
156+
Metadata: Metadata{
157+
Stream: "test-stream",
158+
},
153159
},
154160
{
155161
Key: "key3",
@@ -158,6 +164,9 @@ func TestAddStaticColumn_AddIntegerField_AddColumnToSchemaAndRecords(t *testing.
158164
"last_name": String{Val: "random_lastname2"},
159165
"age": Integer{Val: 10},
160166
},
167+
Metadata: Metadata{
168+
Stream: "test-stream",
169+
},
161170
},
162171
}
163172

@@ -192,7 +201,7 @@ func TestAddStaticColumn_AddAlreadyExistingColumn_PanicsWithAlreadyExistsColumn(
192201
"age": IntType,
193202
},
194203
}
195-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
204+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
196205

197206
assert.Panicsf(t,
198207
func() {

pkg/functions/filter_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestFilter_WithDataFrame_AcceptRelatedRecord(t *testing.T) {
9696
"last_name": StringType,
9797
},
9898
}
99-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
99+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
100100

101101
// Logic to test
102102
result_df := sdf.Filter(functions.Filter{
@@ -112,6 +112,9 @@ func TestFilter_WithDataFrame_AcceptRelatedRecord(t *testing.T) {
112112
"first_name": String{Val: "foobar"},
113113
"last_name": String{Val: "random_lastname"},
114114
},
115+
Metadata: Metadata{
116+
Stream: "test-stream",
117+
},
115118
}
116119
go func() {
117120
input <- Record{
@@ -154,7 +157,7 @@ func TestFilter_WithChainedDataFrame_AcceptRelatedRecord(t *testing.T) {
154157
"email": StringType,
155158
},
156159
}
157-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
160+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
158161

159162
// Logic to test
160163
result_df := sdf.Filter(functions.Filter{
@@ -179,6 +182,9 @@ func TestFilter_WithChainedDataFrame_AcceptRelatedRecord(t *testing.T) {
179182
"last_name": String{Val: "bar"},
180183
"email": String{Val: "random_email"},
181184
},
185+
Metadata: Metadata{
186+
Stream: "test-stream",
187+
},
182188
}
183189
go func() {
184190
input <- Record{

pkg/functions/join/join.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,16 @@ const (
88
Left
99
)
1010

11+
// JoinMode specifies the way to see input streams
12+
type JoinMode int
13+
14+
const (
15+
StreamTable JoinMode = iota
16+
StreamStream
17+
)
18+
1119
// JoinCondition specifies conditions that should be met in order for a join to happen
1220
type JoinCondition struct {
13-
Left string
14-
Right string
21+
LeftKey string
22+
RightKey string
1523
}

pkg/functions/rename_column_test.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func TestRename_ValidNames_ColumnIsRenamedInSchemaAndRecords(t *testing.T) {
116116
"age": IntType,
117117
},
118118
}
119-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
119+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
120120

121121
// Logic to test
122122
result_df := sdf.Rename("last_name", "family_name")
@@ -166,6 +166,9 @@ func TestRename_ValidNames_ColumnIsRenamedInSchemaAndRecords(t *testing.T) {
166166
"family_name": String{Val: "random_lastname"},
167167
"age": Integer{Val: 10},
168168
},
169+
Metadata: Metadata{
170+
Stream: "test-stream",
171+
},
169172
},
170173
{
171174
Key: "key2",
@@ -174,6 +177,9 @@ func TestRename_ValidNames_ColumnIsRenamedInSchemaAndRecords(t *testing.T) {
174177
"family_name": String{Val: "random_lastname"},
175178
"age": Integer{Val: 20},
176179
},
180+
Metadata: Metadata{
181+
Stream: "test-stream",
182+
},
177183
},
178184
{
179185
Key: "key3",
@@ -182,6 +188,9 @@ func TestRename_ValidNames_ColumnIsRenamedInSchemaAndRecords(t *testing.T) {
182188
"family_name": String{Val: "random_lastname2"},
183189
"age": Integer{Val: 30},
184190
},
191+
Metadata: Metadata{
192+
Stream: "test-stream",
193+
},
185194
},
186195
}
187196

@@ -216,7 +225,7 @@ func TestRename_AddAlreadyExistingColumn_PanicsWithAlreadyExistsColumn(t *testin
216225
"age": IntType,
217226
},
218227
}
219-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
228+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
220229

221230
assert.Panicsf(t,
222231
func() {
@@ -239,7 +248,7 @@ func TestRename_AddColumnNameNotExists_PanicsWithColumnNotFound(t *testing.T) {
239248
"age": IntType,
240249
},
241250
}
242-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
251+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
243252

244253
assert.Panicsf(t,
245254
func() {

pkg/functions/schema_validator_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func TestSreamDataFrame_SchemaValidation_AcceptRecordFollowSchema(t *testing.T)
133133
"age": IntType,
134134
},
135135
}
136-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
136+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
137137

138138
// Generate sample data
139139
record_1 := Record{
@@ -184,7 +184,7 @@ func TestSreamDataFrame_SchemaValidation_PanicIfRecordDoesntFollowSchema(t *test
184184
"age": IntType,
185185
},
186186
}
187-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
187+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
188188

189189
// Generate sample data
190190
faulty_record := Record{

pkg/functions/select_test.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestSelect_WithDataFrame_SelectOnlyExpectedFields(t *testing.T) {
9191
"age": IntType,
9292
},
9393
}
94-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
94+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
9595

9696
// Logic to test
9797
result_df := sdf.Select("first_name", "age")
@@ -140,20 +140,29 @@ func TestSelect_WithDataFrame_SelectOnlyExpectedFields(t *testing.T) {
140140
"first_name": String{Val: "random_name"},
141141
"age": Integer{Val: 10},
142142
},
143+
Metadata: Metadata{
144+
Stream: "test-stream",
145+
},
143146
},
144147
{
145148
Key: "key2",
146149
Data: ValueMap{
147150
"first_name": String{Val: "foobar"},
148151
"age": Integer{Val: 20},
149152
},
153+
Metadata: Metadata{
154+
Stream: "test-stream",
155+
},
150156
},
151157
{
152158
Key: "key3",
153159
Data: ValueMap{
154160
"first_name": String{Val: "random_name2"},
155161
"age": Integer{Val: 30},
156162
},
163+
Metadata: Metadata{
164+
Stream: "test-stream",
165+
},
157166
},
158167
}
159168

@@ -187,7 +196,7 @@ func TestSelect_SelectInvalidColumnName_PanicsWithColumnNotFound(t *testing.T) {
187196
"age": IntType,
188197
},
189198
}
190-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
199+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
191200

192201
assert.Panicsf(t,
193202
func() {
@@ -212,7 +221,7 @@ func TestSelect_FirstFilterThenSelect_ShouldSuccessfullyFilterRecordsThenSelect(
212221
"age": IntType,
213222
},
214223
}
215-
sdf := core.NewStreamDataFrame(input, output, errors, schema)
224+
sdf := core.NewStreamDataFrame(input, output, errors, schema, "test-stream")
216225

217226
result_df := sdf.Filter(functions.Filter{
218227
ColumnName: "first_name",
@@ -263,6 +272,9 @@ func TestSelect_FirstFilterThenSelect_ShouldSuccessfullyFilterRecordsThenSelect(
263272
"last_name": String{Val: "bar"},
264273
"age": Integer{Val: 10},
265274
},
275+
Metadata: Metadata{
276+
Stream: "test-stream",
277+
},
266278
}
267279
result := <-output
268280
assert.Equal(t, result, accepted_record)

pkg/types/record.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ type Record struct {
2020

2121
// Metadata contains metadata for a data record, such as timestamps.
2222
type Metadata struct {
23-
Timestamp int64 // Unix timestamp
23+
Timestamp int64 // Unix timestamp
24+
Stream string // Name of the stream record belongs to
2425
}

0 commit comments

Comments
 (0)