-
-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple Aggregates #254
base: main
Are you sure you want to change the base?
Multiple Aggregates #254
Conversation
Merging from upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks the approach is great! I left comments
|
||
def expand(self, pcol: pvalue.PCollection): | ||
columns = { | ||
self.col_name[i]: pcol | "agg " + str(i) >> self._getTransform( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the right idea to add numbers to solve problem with duplicating labels!
Nit: f"Aggregation{i}"
Comment about adding numbers to label names: In BeamBackend
such function was implemented with UniqueLabelGenerator class. But here it's simple enough, so I think the current approach to add numbers instead of using UniqueLabelGenerator
makes sense.
class Aggregate(PrivatePTransform): | ||
"""Transform class for performing multiple aggregations on a PrivatePCollection.""" | ||
|
||
def __init__(self, label=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've though in more details, in most use cases aggregations will share the same parameters (and also sharing the same parameters will help to optimize performance and utility of queries). Could you please
- add argument
params
of type AggregateParams.
2.add argument partition_extractor_fn
Those arguments will be used in each aggregation
col_name: name of the column for the resulting aggregate value. | ||
agg_type: type of pipeline_dp.Metrics identifying the aggregate | ||
to calculate.""" | ||
return _Aggregate([args], col_name=[col_name], agg_type=[agg_type]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks that we can have only one class Aggregate
, w/o _Aggregate
namely
aggregate_value
returns self.aggregate_value
saves in some member variable information about aggregations.expand
works as in_Aggregate
The advantage is that it will be simpler and no need to create multiple instances of _Aggregate
. WDYT?
_agg_named_tuple_cache = {} | ||
|
||
|
||
def _get_or_create_named_tuple(type_name: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is a correct approach to generate dynamic tuples!
def __init__(self, label=None): | ||
super().__init__(return_anonymized=True, label=label) | ||
|
||
def aggregate_value(self, *args, col_name: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all shared parameters will be provided in the constructor, there are just a few parameters that's needed value_extractor
'AggregatesTuple', tuple(["pid"] + [k for k in x[1]]), | ||
tuple([x[0]] + [x[1][k][0] for k in x[1]]))) | ||
|
||
def _getTransform(self, agg_type: pipeline_dp.Metrics, *args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to use DPEngine.aggregate
instead of PrivatePTransforms
Mean
, Sum
, Count
. The benefits are that in future we can optimize performance/utility by computing multiple aggregations with DPEngine.aggregate
class Aggregate can aggregate only for one partition, but by multiple values. For each value aggregation, we run DPEngine.aggregate we need to have AggregateParams:
Some of those parameters are common for all values to aggregate (i.e. they needed to be specified in constructor in 1.
|
Description
Please include a summary of the change, the motivation, and any additional context that will help others understand your PR. If it closes one or more open issues, please tag them as described here.
Affected Dependencies
How has this been tested?
Checklist