Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling of Unmarshall of multiple files processed in parallel #27

Closed
marek-kuticka opened this issue Jun 28, 2023 · 9 comments · Fixed by #28
Closed

handling of Unmarshall of multiple files processed in parallel #27

marek-kuticka opened this issue Jun 28, 2023 · 9 comments · Fixed by #28
Assignees

Comments

@marek-kuticka
Copy link

as mentioned in Issue 26

proper solution to report errors from unknown elements is to implement methods for each part of the structure.

question remains, if marshmallow.Unmarshall() is executed in closure, in parallel processing, how can I provide the information to the error handlers, which file is being processed, especiallyt in parallel processing?

thx a lot

@avivpxi
Copy link
Collaborator

avivpxi commented Jun 29, 2023

@marek-kuticka can you share the code or a relevant example? And explain in the example what do you mean by parallel processing, and by provide information to the error handlers

@marek-kuticka
Copy link
Author

Good morning

I have managed to implement this, below see the patch.

Index: unmarshal.go
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/unmarshal.go b/unmarshal.go
--- a/unmarshal.go	(revision e4159cd02bfdd8cfd15f8dcf4d6176c88fe138e3)
+++ b/unmarshal.go	(date 1688026210791)
@@ -24,13 +24,13 @@
 // returning ErrInvalidValue.
 // - Unmarshal supports three types of Mode values. Each mode is self documented and affects
 // how Unmarshal behaves.
-func Unmarshal(data []byte, v interface{}, options ...UnmarshalOption) (map[string]interface{}, error) {
+func Unmarshal(fileName string, data []byte, v interface{}, options ...UnmarshalOption) (map[string]interface{}, error) {
 	if !isValidValue(v) {
 		return nil, ErrInvalidValue
 	}
 	opts := buildUnmarshalOptions(options)
 	useMultipleErrors := opts.mode == ModeAllowMultipleErrors || opts.mode == ModeFailOverToOriginalValue
-	d := &decoder{options: opts, lexer: &jlexer.Lexer{Data: data, UseMultipleErrors: useMultipleErrors}}
+	d := &decoder{fileName: fileName, options: opts, lexer: &jlexer.Lexer{Data: data, UseMultipleErrors: useMultipleErrors}}
 	result := make(map[string]interface{})
 	if d.lexer.IsNull() {
 		d.lexer.Skip()
@@ -55,8 +55,9 @@
 }
 
 type decoder struct {
-	options *unmarshalOptions
-	lexer   *jlexer.Lexer
+	options  *unmarshalOptions
+	lexer    *jlexer.Lexer
+	fileName string
 }
 
 func (d *decoder) populateStruct(forcePopulate bool, structInstance interface{}, result map[string]interface{}) (interface{}, bool) {
@@ -317,7 +318,7 @@
 	data := make(map[string]interface{})
 	result, valid := d.populateStruct(true, value, data)
 	if valid {
-		handler.HandleJSONData(data)
+		handler.HandleJSONData(data, d.fileName)
 	}
 	return result, valid
 }

decoder struct has received a fileName, and it is passed from the Unmarshall function, way down to the call to the handler.

code, which does the parallel processing is:

func ReadJson() {
	var waitGroup sync.WaitGroup

	dir := "/Users/marek/Downloads/fashion-dataset/styles/"

	files, err := os.ReadDir(dir)
	if err != nil {
		log.Fatal(err)
	}

	fdb := db.DB()
	
	for _, file := range files {
		waitGroup.Add(1)
		strFile := dir + file.Name()

		go func() {
			defer waitGroup.Done()

			processFile(strFile)
			_, err := fdb.Collection("logs").InsertOne(context.TODO(), bson.D{
				{"task", "File Processing"},
				{"name", strFile},
			})

			if err != nil {
				panic(err)
			}

		}()
	}

	defer func() {
		if err := fdb.Client().Disconnect(context.TODO()); err != nil {
			panic(err)
		}
	}()

	waitGroup.Wait()

}

func processFile(file string) {
	v, err := os.ReadFile(file) //read the content of file
	if err != nil {
		panic(err)
	}

	var prod Product

	_, err = marshmallow.Unmarshal(file, v, &prod, marshmallow.WithExcludeKnownFieldsFromMap(true))
	if err != nil {
		return
	}
}

func logError(err string, fileName string) {
	db.DB().Collection("logs").InsertOne(context.TODO(), bson.D{
		{"name", fileName},
		{"message", err},
	})
}

with multiple handler functions as follows:

func (p *Product) HandleJSONData(data map[string]interface{}, fileName string) {
	for key := range data {
		logError(fmt.Sprintf("for Product: unknown field in json: %s", key), fileName)
	}
}

issue was, that if handler, shown in last code piece above has no information about the file name, there is no option to find out later.

this is only a quick change, probably better would be to just take file name as parameter and read it into bytes within Unmarshall.

Marek

@marek-kuticka
Copy link
Author

or.. include it maybe in options? ... marshmallow.WithFileName(...) instead of a direct parameter within Unmarshall func

@avivpxi
Copy link
Collaborator

avivpxi commented Jun 29, 2023

@marek-kuticka
Quick solution:

func processFile(file string) {
	v, err := os.ReadFile(file) //read the content of file
	if err != nil {
		panic(err)
	}

	prod := Product{file: file}

	_, err = marshmallow.Unmarshal(file, v, &prod, marshmallow.WithExcludeKnownFieldsFromMap(true))
	if err != nil {
		return
	}
        for e := range prod.errors {
                logError(e)
        }
}

type Product struct {
        // ...

        file       string
        errors []string
}

func (p *Product) HandleJSONData(data map[string]interface{}) {
	for key := range data {
		p.errors = append(p.errors, fmt.Sprintf("for Product: unknown field in json: %s", key), p.file))
	}
}

You can achieve that with the current API of Marshmallow.
I do not wish to create APIs to support various use cases and needs and I do think HandleJSONData allow writing a solution to all use cases by using the struct fields.
However, I do think that error handling is missing from the HandleJSONData and I suggest we add an additional interface here, something like this:

type JSONDataHandler interface {
	HandleJSONData(data map[string]interface{})
}

type JSONDataHandlerWithError interface {
	HandleJSONData(data map[string]interface{}) error
}

if the second one is implemented and returns an error we can propagate it back to the marshmallow.Unmarshal caller, i.e.:

func processFile(file string) {
	v, err := os.ReadFile(file) //read the content of file
	if err != nil {
		panic(err)
	}

	var prod Product

	_, err = marshmallow.Unmarshal(file, v, &prod, marshmallow.WithExcludeKnownFieldsFromMap(true))
	if err != nil {
		logError(err, file)
	}
}

func (p *Product) HandleJSONData(data map[string]interface{}) error {
        var err error
	for key := range data {
                 err = errors.Join(err, fmt.Errorf("for Product: unknown field in json: %s", key))
	}
        return err
}

If the first one looks sufficient - go for it.
If you prefer the second one, which I personally agree with, feel free to open a PR with the required changes

@marek-kuticka
Copy link
Author

I'd have to look into this, as... I think your suggestions works only in case, that we report all levels to top level structure, as discussed in previous issue.

However, if the structure of JSON files is much more complicated, and I have approx 15 structures, and thus 15 handler methods, so I have to check, if it works as you suggested

@avivpxi
Copy link
Collaborator

avivpxi commented Jun 30, 2023

Still possible, add an errors field to each of the 15 structures, then traverse the whole tree to look then up and log all of them

@marek-kuticka let me know if you want to try implementing the second option, if not, i think i can get to it early next week

@marek-kuticka
Copy link
Author

So.. Bind the "name" to the Product, thus not needed to put it into Decoder.. Then traverse All to collect errors and bind them to product itself? Should be feasible.

Not sure I understood fully the 2nd Option. I would maybe try to implement this, would probably need some more details.

Thx. Marek

@avivpxi
Copy link
Collaborator

avivpxi commented Jun 30, 2023

Go for the first.
I'll give the second option a go next week and let you know

@avivpxi avivpxi linked a pull request Jul 2, 2023 that will close this issue
@avivpxi avivpxi self-assigned this Jul 2, 2023
@avivpxi
Copy link
Collaborator

avivpxi commented Jul 3, 2023

@marek-kuticka v1.1.5 is released with support for reporting errors from HandleJSONData, example usage for your use case:

func processFile(file string) {
	v, err := os.ReadFile(file) //read the content of file
	if err != nil {
		panic(err)
	}

	var prod Product

	_, err = marshmallow.Unmarshal(file, v, &prod, marshmallow.WithExcludeKnownFieldsFromMap(true))
	if err != nil {
                // all errors returned from any nested `HandleJSONData` function call will be available here.
                // if more than one `HandleJSONData` call returned an error it will be available as a marshmallow.MultipleLexerError
                // play with it to get a grip on things
		logError(err, file)
	}
}

func (p *Product) HandleJSONData(data map[string]interface{}) error {
        var err error
	for key := range data {
                 err = errors.Join(err, fmt.Errorf("for Product: unknown field in json: %s", key))
	}
        return err
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants