-
Notifications
You must be signed in to change notification settings - Fork 7
Performance investigation #7278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
dadd0f2
05ce482
06adf7c
807508a
2aefaaf
ab5a4eb
23b1e5c
354d7b3
b4851d1
7244f40
ec02df8
9555b5f
1b86b02
b8b5421
8626e95
85b4d5d
8a83922
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
description: Improve performance for very large models | ||
issue-nr: 7262 | ||
change-type: minor | ||
destination-branches: [master, iso7] | ||
sections: | ||
minor-improvement: "{{description}}" | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4621,6 +4621,40 @@ def convert_or_ignore(rvid): | |||||
) | ||||||
return out | ||||||
|
||||||
@classmethod | ||||||
async def set_deployed_multi( | ||||||
cls, | ||||||
environment: uuid.UUID, | ||||||
resource_ids: Sequence[m.ResourceIdStr], | ||||||
version: int, | ||||||
connection: Optional[asyncpg.connection.Connection] = None, | ||||||
) -> None: | ||||||
query = "UPDATE resource SET status='deployed' WHERE environment=$1 AND model=$2 AND resource_id =ANY($3) " | ||||||
async with cls.get_connection(connection) as connection: | ||||||
await connection.execute(query, environment, version, resource_ids) | ||||||
|
||||||
@classmethod | ||||||
async def get_resource_ids_with_status( | ||||||
cls, | ||||||
environment: uuid.UUID, | ||||||
resource_version_ids: list[m.ResourceIdStr], | ||||||
version: int, | ||||||
statuses: Sequence[const.ResourceState], | ||||||
lock: Optional[RowLockMode] = None, | ||||||
connection: Optional[asyncpg.connection.Connection] = None, | ||||||
) -> list[m.ResourceIdStr]: | ||||||
query = ( | ||||||
"SELECT resource_id as resource_id FROM resource WHERE " | ||||||
"environment=$1 AND model=$2 AND status = ANY($3) and resource_id =ANY($4) " | ||||||
) | ||||||
if lock: | ||||||
query += lock.value | ||||||
async with cls.get_connection(connection) as connection: | ||||||
return [ | ||||||
m.ResourceIdStr(cast(str, r["resource_id"])) | ||||||
sanderr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
for r in await connection.fetch(query, environment, version, statuses, resource_version_ids) | ||||||
] | ||||||
|
||||||
@classmethod | ||||||
async def get_undeployable(cls, environment: uuid.UUID, version: int) -> list["Resource"]: | ||||||
""" | ||||||
|
@@ -4796,27 +4830,38 @@ async def get_resources_for_version_raw_with_persistent_state( | |||||
version: int, | ||||||
projection: Optional[list[str]], | ||||||
projection_presistent: Optional[list[str]], | ||||||
project_attributes: Optional[list[str]] = None, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added this so we don't have to pull in all attributes for the resource when not needed, this can be a LOT of data There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
See typing.LiteralString docs. I think we should use this ideally because we bake it into the query itself, which is not safe with any user input. Ideally we'd do the same for the other lists but I'll leave that decision up to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mypy doesn't support it python/mypy#12554 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, shame |
||||||
*, | ||||||
connection: Optional[Connection] = None, | ||||||
) -> list[dict[str, object]]: | ||||||
"""This method performs none of the mangling required to produce valid resources!""" | ||||||
"""This method performs none of the mangling required to produce valid resources! | ||||||
|
||||||
project_attributes performs a projection on the json attributes of the resources table | ||||||
sanderr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
|
||||||
def collect_projection(projection: Optional[list[str]], prefix: str) -> str: | ||||||
if not projection: | ||||||
return f"{prefix}.*" | ||||||
else: | ||||||
return ",".join(f"{prefix}.{field}" for field in projection) | ||||||
|
||||||
if project_attributes: | ||||||
json_projection = "," + ",".join(f"r.attributes->'{v}' as {v}" for v in project_attributes) | ||||||
else: | ||||||
json_projection = "" | ||||||
|
||||||
query = f""" | ||||||
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} | ||||||
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} {json_projection} | ||||||
FROM {cls.table_name()} r JOIN resource_persistent_state ps ON r.resource_id = ps.resource_id | ||||||
WHERE r.environment=$1 AND ps.environment = $1 and r.model = $2;""" | ||||||
|
||||||
resource_records = await cls._fetch_query(query, environment, version, connection=connection) | ||||||
resources = [dict(record) for record in resource_records] | ||||||
for res in resources: | ||||||
if "attributes" in res: | ||||||
res["attributes"] = json.loads(res["attributes"]) | ||||||
if project_attributes: | ||||||
for k in project_attributes: | ||||||
if res[k]: | ||||||
res[k] = json.loads(res[k]) | ||||||
return resources | ||||||
|
||||||
@classmethod | ||||||
|
@@ -5403,6 +5448,7 @@ async def get_list( | |||||
no_obj: Optional[bool] = None, | ||||||
lock: Optional[RowLockMode] = None, | ||||||
connection: Optional[asyncpg.connection.Connection] = None, | ||||||
no_status: bool = False, # don't load the status field | ||||||
**query: object, | ||||||
) -> list["ConfigurationModel"]: | ||||||
# sanitize and validate order parameters | ||||||
|
@@ -5446,14 +5492,21 @@ async def get_list( | |||||
{lock_statement}""" | ||||||
query_result = await cls._fetch_query(query_string, *values, connection=connection) | ||||||
result = [] | ||||||
for record in query_result: | ||||||
record = dict(record) | ||||||
for in_record in query_result: | ||||||
record = dict(in_record) | ||||||
if no_obj: | ||||||
record["status"] = await cls._get_status_field(record["environment"], record["status"]) | ||||||
if no_status: | ||||||
record["status"] = {} | ||||||
else: | ||||||
record["status"] = await cls._get_status_field(record["environment"], record["status"]) | ||||||
result.append(record) | ||||||
else: | ||||||
done = record.pop("done") | ||||||
status = await cls._get_status_field(record["environment"], record.pop("status")) | ||||||
if no_status: | ||||||
status = {} | ||||||
record.pop("status") | ||||||
else: | ||||||
status = await cls._get_status_field(record["environment"], record.pop("status")) | ||||||
obj = cls(from_postgres=True, **record) | ||||||
obj._done = done | ||||||
obj._status = status | ||||||
|
@@ -5706,7 +5759,6 @@ async def get_increment( | |||||
projection_a_resource = [ | ||||||
"resource_id", | ||||||
"attribute_hash", | ||||||
"attributes", | ||||||
"status", | ||||||
] | ||||||
projection_a_state = [ | ||||||
|
@@ -5715,11 +5767,12 @@ async def get_increment( | |||||
"last_deployed_attribute_hash", | ||||||
"last_non_deploying_status", | ||||||
] | ||||||
projection_a_attributes = ["requires", "send_event"] | ||||||
projection = ["resource_id", "status", "attribute_hash"] | ||||||
|
||||||
# get resources for agent | ||||||
resources = await Resource.get_resources_for_version_raw_with_persistent_state( | ||||||
environment, version, projection_a_resource, projection_a_state, connection=connection | ||||||
environment, version, projection_a_resource, projection_a_state, projection_a_attributes, connection=connection | ||||||
) | ||||||
|
||||||
# to increment | ||||||
|
@@ -5740,20 +5793,11 @@ async def get_increment( | |||||
continue | ||||||
# Now outstanding events | ||||||
last_success = resource["last_success"] or DATETIME_MIN_UTC | ||||||
attributes = resource["attributes"] | ||||||
assert isinstance(attributes, dict) # mypy | ||||||
for req in attributes["requires"]: | ||||||
for req in resource["requires"]: | ||||||
req_res = id_to_resource[req] | ||||||
assert req_res is not None # todo | ||||||
req_res_attributes = req_res["attributes"] | ||||||
assert isinstance(req_res_attributes, dict) # mypy | ||||||
last_produced_events = req_res["last_produced_events"] | ||||||
if ( | ||||||
last_produced_events is not None | ||||||
and last_produced_events > last_success | ||||||
and "send_event" in req_res_attributes | ||||||
and req_res_attributes["send_event"] | ||||||
): | ||||||
if last_produced_events is not None and last_produced_events > last_success and req_res["send_event"]: | ||||||
in_increment = True | ||||||
break | ||||||
|
||||||
|
@@ -5839,9 +5883,9 @@ async def get_increment( | |||||
|
||||||
# build lookup tables | ||||||
for res in resources: | ||||||
for req in res["attributes"]["requires"]: | ||||||
for req in res["requires"]: | ||||||
original_provides[req].append(res["resource_id"]) | ||||||
if "send_event" in res["attributes"] and res["attributes"]["send_event"]: | ||||||
if res["send_event"]: | ||||||
send_events.append(res["resource_id"]) | ||||||
|
||||||
# recursively include stuff potentially receiving events from nodes in the increment | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.