Skip to content

Commit b47de41

Browse files
committed
fixes lexico ordering
1 parent b52240c commit b47de41

File tree

3 files changed

+24
-6
lines changed

3 files changed

+24
-6
lines changed

worker/pkg/benthos/sql/input_sql_raw.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,15 @@ func (s *pooledInput) Connect(ctx context.Context) error {
172172
}
173173
s.logger.Debug("using paged query")
174174
query = *s.pagedQueryStatic
175-
args = append(args, s.continuationToken.Contents.LastReadOrderValues...)
175+
176+
// Build arguments for lexicographical ordering
177+
lastValues := s.continuationToken.Contents.LastReadOrderValues
178+
for i := 0; i < len(s.orderByColumns); i++ {
179+
// For each OR condition, add values up to and including current position
180+
for j := 0; j <= i; j++ {
181+
args = append(args, lastValues[j])
182+
}
183+
}
176184
args = append(args, *s.expectedTotalRows)
177185
}
178186

worker/pkg/query-builder2/querybuilder.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,19 @@ func (qb *QueryBuilder) buildPageQuery(schema, tableName string, query *goqu.Sel
165165
key := qb.getTableKey(schema, tableName)
166166
orderBy := qb.orderBy[key]
167167
if len(orderBy) > 0 {
168-
// Add where clause using order by columns directly with goqu
169-
conditions := make([]exp.Expression, len(orderBy))
170-
for i, col := range orderBy {
171-
conditions[i] = goqu.T(rootAlias).Col(col).Gt(0)
168+
// Build lexicographical ordering conditions
169+
var conditions []exp.Expression
170+
for i := 0; i < len(orderBy); i++ {
171+
var subConditions []exp.Expression
172+
// Add equality conditions for all columns before current
173+
for j := 0; j < i; j++ {
174+
subConditions = append(subConditions, goqu.T(rootAlias).Col(orderBy[j]).Eq(goqu.L("?", 0)))
175+
}
176+
// Add greater than condition for current column
177+
subConditions = append(subConditions, goqu.T(rootAlias).Col(orderBy[i]).Gt(goqu.L("?", 0)))
178+
conditions = append(conditions, goqu.And(subConditions...))
172179
}
173-
query = query.Where(goqu.And(conditions...))
180+
query = query.Where(goqu.Or(conditions...))
174181
}
175182
return query.Prepared(true)
176183
}

worker/pkg/workflows/datasync/workflow/workflow.go

+3
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,9 @@ func invokeSync(
536536
err := workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
537537
WorkflowID: getChildWorkflowId(info.WorkflowExecution.ID, config.Name, workflow.Now(ctx)),
538538
StaticSummary: fmt.Sprintf("Syncing %s.%s", config.TableSchema, config.TableName),
539+
RetryPolicy: &temporal.RetryPolicy{
540+
MaximumAttempts: 1,
541+
},
539542
}), tsWf.TableSync, &tablesync_workflow.TableSyncRequest{
540543
AccountId: accId,
541544
Id: config.Name,

0 commit comments

Comments
 (0)