Skip to content

Commit d82fae1

Browse files
committed
feat: Add StreamTable Inner join functionality to the main SDF
1 parent 771c859 commit d82fae1

File tree

5 files changed

+186
-15
lines changed

5 files changed

+186
-15
lines changed

pkg/core/config.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package core
22

3-
import "log/slog"
3+
import (
4+
"log/slog"
5+
6+
"github.com/farbodahm/streame/pkg/state_store"
7+
)
48

59
// Option implements the Functional Option pattern for StreamDataFrame
610
type Option func(*Config)
711

812
// Config is the configuration options for StreamDataFrame
913
type Config struct {
10-
LogLevel slog.Level
14+
LogLevel slog.Level
15+
StateStore state_store.StateStore
1116
}
1217

1318
// WithLogLevel sets the log level for StreamDataFrame
@@ -16,3 +21,12 @@ func WithLogLevel(level slog.Level) Option {
1621
c.LogLevel = level
1722
}
1823
}
24+
25+
// WithStateStore sets the state store for StreamDataFrame.
26+
// If not set, the default in-memory state store will be used
27+
// which is not recommended for production use
28+
func WithStateStore(ss state_store.StateStore) Option {
29+
return func(c *Config) {
30+
c.StateStore = ss
31+
}
32+
}

pkg/core/config_test.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@ package core_test
22

33
import (
44
"log/slog"
5+
"os"
56
"testing"
67

8+
"github.com/cockroachdb/pebble"
79
. "github.com/farbodahm/streame/pkg/core"
10+
"github.com/farbodahm/streame/pkg/state_store"
811
"github.com/farbodahm/streame/pkg/types"
912
"github.com/stretchr/testify/assert"
1013
)
1114

1215
func TestStreamDataFrame_ConfigDefaultValues_DefaultValuesAssignedCorrectly(t *testing.T) {
1316
sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{}, "test-stream")
1417

15-
default_config := Config{
16-
LogLevel: slog.LevelInfo,
17-
}
18-
19-
assert.Equal(t, default_config, *sdf.Configs)
18+
assert.Equal(t, sdf.Configs.LogLevel, slog.LevelInfo)
19+
_, in_memory_ss := sdf.Configs.StateStore.(*state_store.InMemorySS)
20+
assert.True(t, in_memory_ss)
2021
}
2122

2223
func TestStreamDataFrame_ConfigWithLogLevel_LogLevelAssignedCorrectly(t *testing.T) {
@@ -26,3 +27,14 @@ func TestStreamDataFrame_ConfigWithLogLevel_LogLevelAssignedCorrectly(t *testing
2627

2728
assert.Equal(t, slog.LevelError, sdf.Configs.LogLevel)
2829
}
30+
31+
func TestStreamDataFrame_WithStateStore_StateStoreAssignedCorrectly(t *testing.T) {
32+
warehouse_path := "./test-path"
33+
defer os.RemoveAll(warehouse_path)
34+
ss, _ := state_store.NewPebbleStateStore(warehouse_path, &pebble.Options{})
35+
sdf := NewStreamDataFrame(nil, nil, nil, types.Schema{}, "test-stream",
36+
WithStateStore(ss),
37+
)
38+
39+
assert.Equal(t, ss, sdf.Configs.StateStore)
40+
}

pkg/core/dataframe.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type DataFrame interface {
1414
Select(columns ...string) DataFrame
1515
AddStaticColumn(name string, value types.ColumnValue) DataFrame
1616
Rename(old_name string, new_name string) DataFrame
17-
Join(other DataFrame, how join.JoinType, on join.JoinCondition) DataFrame
17+
Join(other *StreamDataFrame, how join.JoinType, on join.JoinCondition, mode join.JoinMode) DataFrame
1818
Execute(ctx context.Context) error
1919
GetSchema() types.Schema
2020
}

pkg/core/stream_dataframe.go

+76-7
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"errors"
66
"log/slog"
7+
"strings"
78

89
"github.com/farbodahm/streame/pkg/functions"
910
"github.com/farbodahm/streame/pkg/functions/join"
11+
"github.com/farbodahm/streame/pkg/state_store"
1012
"github.com/farbodahm/streame/pkg/types"
1113
"github.com/farbodahm/streame/pkg/utils"
1214
"github.com/google/uuid"
@@ -24,6 +26,15 @@ type StreamDataFrame struct {
2426
Stages []Stage
2527
Schema types.Schema
2628
Configs *Config
29+
30+
stateStore state_store.StateStore
31+
// previousExecutors holds all of the SDFs which current SDF is relying on.
32+
// Currently only `Join` operation requires this structure so that it can first run
33+
// all of the previous SDFs before running itself.
34+
previousExecutors []*StreamDataFrame
35+
36+
// TODO: Refactor for a better way for storing runtime configs
37+
runtimeConfig map[string]any
2738
}
2839

2940
// NewStreamDataFrame creates a new StreamDataFrame with the given options
@@ -37,13 +48,20 @@ func NewStreamDataFrame(
3748
) StreamDataFrame {
3849
// Create config with default values
3950
config := Config{
40-
LogLevel: slog.LevelInfo,
51+
LogLevel: slog.LevelInfo,
52+
StateStore: state_store.NewInMemorySS(),
4153
}
4254
// Functional Option pattern
4355
for _, option := range options {
4456
option(&config)
4557
}
4658

59+
utils.InitLogger(config.LogLevel)
60+
61+
if _, ok := config.StateStore.(*state_store.InMemorySS); ok {
62+
utils.Logger.Warn("Using in-memory state store. This is not suitable for production use.")
63+
}
64+
4765
sdf := StreamDataFrame{
4866
SourceStream: sourceStream,
4967
OutputStream: outputStream,
@@ -52,10 +70,18 @@ func NewStreamDataFrame(
5270
Stages: []Stage{},
5371
Schema: schema,
5472
Configs: &config,
73+
74+
runtimeConfig: make(map[string]any),
75+
stateStore: config.StateStore,
76+
previousExecutors: []*StreamDataFrame{},
77+
}
78+
79+
// Only source streams need to have schema validation. When a SDF
80+
// is created by joining 2 other streams, it doesn't need any schema validation stage.
81+
if !strings.HasSuffix(streamName, join.JoinedStreamSuffix) {
82+
sdf.validateSchema()
5583
}
5684

57-
sdf.validateSchema()
58-
utils.InitLogger(config.LogLevel)
5985
return sdf
6086
}
6187

@@ -85,8 +111,45 @@ func (sdf *StreamDataFrame) Select(columns ...string) DataFrame {
85111
}
86112

87113
// Join joins the DataFrame with another DataFrame based on the given join type and condition
88-
func (sdf *StreamDataFrame) Join(other DataFrame, how join.JoinType, on join.JoinCondition) DataFrame {
89-
panic("Not Implemented")
114+
func (sdf *StreamDataFrame) Join(other *StreamDataFrame, how join.JoinType, on join.JoinCondition, mode join.JoinMode) DataFrame {
115+
// Validate join condition
116+
err := join.ValidateJoinCondition(sdf.Schema, other.Schema, on)
117+
if err != nil {
118+
panic(err)
119+
}
120+
121+
// Merge schemas
122+
new_schema, err := join.MergeSchema(sdf.Schema, other.GetSchema())
123+
if err != nil {
124+
panic(err)
125+
}
126+
127+
// Fan-In pattern to join 2 streams into 1 stream
128+
merged_sources := utils.MergeChannels(sdf.OutputStream, other.OutputStream)
129+
merged_errors := utils.MergeChannels(sdf.ErrorStream, other.ErrorStream)
130+
131+
out := make(chan (types.Record))
132+
new_sdf := NewStreamDataFrame(
133+
merged_sources,
134+
out,
135+
merged_errors,
136+
new_schema,
137+
sdf.Name+"-"+other.Name+join.JoinedStreamSuffix,
138+
)
139+
// TODO: Decide on configs
140+
new_sdf.Configs = sdf.Configs
141+
142+
new_sdf.runtimeConfig[sdf.Name] = join.Stream
143+
new_sdf.runtimeConfig[other.Name] = join.Table
144+
145+
executor := func(ctx context.Context, record types.Record) ([]types.Record, error) {
146+
record_type := new_sdf.runtimeConfig[record.Metadata.Stream].(join.RecordType)
147+
return join.InnerJoinStreamTable(new_sdf.stateStore, record_type, record, on), nil
148+
}
149+
150+
new_sdf.previousExecutors = append(new_sdf.previousExecutors, sdf, other)
151+
new_sdf.addToStages(executor)
152+
return &new_sdf
90153
}
91154

92155
// Filter applies filter function to each record of the DataFrame
@@ -206,11 +269,17 @@ func (sdf *StreamDataFrame) addToStages(executor StageExecutor) {
206269
// It simply runs all of the stages.
207270
// It's a blocking call and returns when the context is cancelled or panics when an error occurs.
208271
func (sdf *StreamDataFrame) Execute(ctx context.Context) error {
209-
utils.Logger.Info("Executing processor with", "len(stages)", len(sdf.Stages))
272+
utils.Logger.Info("Executing processor", "name", sdf.Name, "len(stages)", len(sdf.Stages))
210273
if len(sdf.Stages) == 0 {
211274
return errors.New("no stages are created")
212275
}
213276

277+
// Execute previous SDFs which current SDF depends on first (if there are any)
278+
for _, previous_sdf := range sdf.previousExecutors {
279+
utils.Logger.Info("Executing previous SDF", "name", previous_sdf.Name)
280+
go previous_sdf.Execute(ctx)
281+
}
282+
214283
for _, stage := range sdf.Stages {
215284
go stage.Run(ctx)
216285
}
@@ -220,7 +289,7 @@ func (sdf *StreamDataFrame) Execute(ctx context.Context) error {
220289
case err := <-sdf.ErrorStream:
221290
panic(err)
222291
case <-ctx.Done():
223-
utils.Logger.Info("Processor execution completed")
292+
utils.Logger.Info("Processor execution completed", "name", sdf.Name)
224293
return nil // Exit the loop if the context is cancelled
225294
}
226295
}

pkg/functions/join/join_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package join_test
22

33
import (
4+
"context"
45
"testing"
56

7+
"github.com/farbodahm/streame/pkg/core"
68
"github.com/farbodahm/streame/pkg/functions/join"
79
"github.com/farbodahm/streame/pkg/state_store"
810
. "github.com/farbodahm/streame/pkg/types"
@@ -135,3 +137,77 @@ func TestInnerJoinStreamTable_WithStreamRecord_JoinSuccessfully(t *testing.T) {
135137
err = ss.Close()
136138
assert.Nil(t, err)
137139
}
140+
141+
// Integration tests
142+
func TestJoin_SimpleStreamTableJoin_ShouldJoinStreamRecordToTableRecord(t *testing.T) {
143+
// User Data
144+
user_input := make(chan Record)
145+
user_output := make(chan Record)
146+
user_errors := make(chan error)
147+
user_schema := Schema{
148+
Columns: Fields{
149+
"email": StringType,
150+
"first_name": StringType,
151+
"last_name": StringType,
152+
},
153+
}
154+
user_sdf := core.NewStreamDataFrame(user_input, user_output, user_errors, user_schema, "user-stream")
155+
156+
// Order Data
157+
order_input := make(chan Record)
158+
orders_output := make(chan Record)
159+
orders_errors := make(chan error)
160+
orders_schema := Schema{
161+
Columns: Fields{
162+
"user_email": StringType,
163+
"amount": IntType,
164+
},
165+
}
166+
orders_sdf := core.NewStreamDataFrame(order_input, orders_output, orders_errors, orders_schema, "orders-stream")
167+
168+
// Logic to test
169+
joined_sdf := orders_sdf.Join(&user_sdf, join.Inner, join.JoinCondition{LeftKey: "user_email", RightKey: "email"}, join.StreamTable).(*core.StreamDataFrame)
170+
171+
go func() {
172+
user_input <- Record{
173+
Key: "key1",
174+
Data: ValueMap{
175+
"first_name": String{Val: "foo"},
176+
"last_name": String{Val: "bar"},
177+
"email": String{Val: "[email protected]"},
178+
},
179+
}
180+
order_input <- Record{
181+
Key: "key2",
182+
Data: ValueMap{
183+
"user_email": String{Val: "[email protected]"},
184+
"amount": Integer{Val: 100},
185+
},
186+
}
187+
}()
188+
189+
ctx, cancel := context.WithCancel(context.Background())
190+
go joined_sdf.Execute(ctx)
191+
192+
result := <-joined_sdf.OutputStream
193+
cancel()
194+
// Assertions
195+
expected_record := Record{
196+
Key: "key2-key1",
197+
Data: ValueMap{
198+
"user_email": String{Val: "[email protected]"},
199+
"amount": Integer{Val: 100},
200+
"first_name": String{Val: "foo"},
201+
"last_name": String{Val: "bar"},
202+
"email": String{Val: "[email protected]"},
203+
},
204+
Metadata: Metadata{
205+
Stream: orders_sdf.Name + "-" + user_sdf.Name + join.JoinedStreamSuffix,
206+
},
207+
}
208+
assert.Equal(t, expected_record, result)
209+
assert.Equal(t, 0, len(user_errors))
210+
assert.Equal(t, 0, len(orders_errors))
211+
assert.Equal(t, 0, len(joined_sdf.ErrorStream))
212+
assert.Equal(t, 0, len(joined_sdf.OutputStream))
213+
}

0 commit comments

Comments
 (0)