Skip to content

Commit 6a17a62

Browse files
AIP-84 Patch Pool (#43266)
* AIP-84 Patch Pool * Fix CI
1 parent ca2c809 commit 6a17a62

File tree

11 files changed

+500
-8
lines changed

11 files changed

+500
-8
lines changed

Diff for: airflow/api_connexion/endpoints/pool_endpoint.py

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def get_pools(
8787
return pool_collection_schema.dump(PoolCollection(pools=pools, total_entries=total_entries))
8888

8989

90+
@mark_fastapi_migration_done
9091
@security.requires_access_pool("PUT")
9192
@action_logging
9293
@provide_session

Diff for: airflow/api_fastapi/core_api/openapi/v1-generated.yaml

+91
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,72 @@ paths:
11631163
application/json:
11641164
schema:
11651165
$ref: '#/components/schemas/HTTPValidationError'
1166+
patch:
1167+
tags:
1168+
- Pool
1169+
summary: Patch Pool
1170+
description: Update a Pool.
1171+
operationId: patch_pool
1172+
parameters:
1173+
- name: pool_name
1174+
in: path
1175+
required: true
1176+
schema:
1177+
type: string
1178+
title: Pool Name
1179+
- name: update_mask
1180+
in: query
1181+
required: false
1182+
schema:
1183+
anyOf:
1184+
- type: array
1185+
items:
1186+
type: string
1187+
- type: 'null'
1188+
title: Update Mask
1189+
requestBody:
1190+
required: true
1191+
content:
1192+
application/json:
1193+
schema:
1194+
$ref: '#/components/schemas/PoolBody'
1195+
responses:
1196+
'200':
1197+
description: Successful Response
1198+
content:
1199+
application/json:
1200+
schema:
1201+
$ref: '#/components/schemas/PoolResponse'
1202+
'400':
1203+
content:
1204+
application/json:
1205+
schema:
1206+
$ref: '#/components/schemas/HTTPExceptionResponse'
1207+
description: Bad Request
1208+
'401':
1209+
content:
1210+
application/json:
1211+
schema:
1212+
$ref: '#/components/schemas/HTTPExceptionResponse'
1213+
description: Unauthorized
1214+
'403':
1215+
content:
1216+
application/json:
1217+
schema:
1218+
$ref: '#/components/schemas/HTTPExceptionResponse'
1219+
description: Forbidden
1220+
'404':
1221+
content:
1222+
application/json:
1223+
schema:
1224+
$ref: '#/components/schemas/HTTPExceptionResponse'
1225+
description: Not Found
1226+
'422':
1227+
description: Validation Error
1228+
content:
1229+
application/json:
1230+
schema:
1231+
$ref: '#/components/schemas/HTTPValidationError'
11661232
/public/pools/:
11671233
get:
11681234
tags:
@@ -2222,6 +2288,31 @@ components:
22222288
- timetables
22232289
title: PluginResponse
22242290
description: Plugin serializer.
2291+
PoolBody:
2292+
properties:
2293+
pool:
2294+
anyOf:
2295+
- type: string
2296+
- type: 'null'
2297+
title: Pool
2298+
slots:
2299+
anyOf:
2300+
- type: integer
2301+
- type: 'null'
2302+
title: Slots
2303+
description:
2304+
anyOf:
2305+
- type: string
2306+
- type: 'null'
2307+
title: Description
2308+
include_deferred:
2309+
anyOf:
2310+
- type: boolean
2311+
- type: 'null'
2312+
title: Include Deferred
2313+
type: object
2314+
title: PoolBody
2315+
description: Pool serializer for bodies.
22252316
PoolCollectionResponse:
22262317
properties:
22272318
pools:

Diff for: airflow/api_fastapi/core_api/routes/public/pools.py

+43-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from fastapi import Depends, HTTPException
19+
from fastapi import Depends, HTTPException, Query
20+
from fastapi.exceptions import RequestValidationError
21+
from pydantic import ValidationError
2022
from sqlalchemy import delete, select
2123
from sqlalchemy.orm import Session
2224
from typing_extensions import Annotated
@@ -25,7 +27,12 @@
2527
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
2628
from airflow.api_fastapi.common.router import AirflowRouter
2729
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
28-
from airflow.api_fastapi.core_api.serializers.pools import PoolCollectionResponse, PoolResponse
30+
from airflow.api_fastapi.core_api.serializers.pools import (
31+
BasePool,
32+
PoolBody,
33+
PoolCollectionResponse,
34+
PoolResponse,
35+
)
2936
from airflow.models.pool import Pool
3037

3138
pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
@@ -95,3 +102,37 @@ async def get_pools(
95102
pools=[PoolResponse.model_validate(pool, from_attributes=True) for pool in pools],
96103
total_entries=total_entries,
97104
)
105+
106+
107+
@pools_router.patch("/{pool_name}", responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
108+
async def patch_pool(
109+
pool_name: str,
110+
patch_body: PoolBody,
111+
session: Annotated[Session, Depends(get_session)],
112+
update_mask: list[str] | None = Query(None),
113+
) -> PoolResponse:
114+
"""Update a Pool."""
115+
# Only slots and include_deferred can be modified in 'default_pool'
116+
if pool_name == Pool.DEFAULT_POOL_NAME:
117+
if update_mask and all(mask.strip() in {"slots", "include_deferred"} for mask in update_mask):
118+
pass
119+
else:
120+
raise HTTPException(400, "Only slots and included_deferred can be modified on Default Pool")
121+
122+
pool = session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
123+
if not pool:
124+
raise HTTPException(404, detail=f"The Pool with name: `{pool_name}` was not found")
125+
126+
if update_mask:
127+
data = patch_body.model_dump(include=set(update_mask), by_alias=True)
128+
else:
129+
data = patch_body.model_dump(by_alias=True)
130+
try:
131+
BasePool.model_validate(data)
132+
except ValidationError as e:
133+
raise RequestValidationError(errors=e.errors())
134+
135+
for key, value in data.items():
136+
setattr(pool, key, value)
137+
138+
return PoolResponse.model_validate(pool, from_attributes=True)

Diff for: airflow/api_fastapi/core_api/routes/public/variables.py

-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ async def patch_variable(
119119
data = patch_body.model_dump(exclude=non_update_fields)
120120
for key, val in data.items():
121121
setattr(variable, key, val)
122-
session.add(variable)
123122
return variable
124123

125124

Diff for: airflow/api_fastapi/core_api/serializers/pools.py

+19-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from typing import Annotated, Callable
2121

22-
from pydantic import BaseModel, BeforeValidator, Field
22+
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field
2323

2424

2525
def _call_function(function: Callable[[], int]) -> int:
@@ -31,14 +31,18 @@ def _call_function(function: Callable[[], int]) -> int:
3131
return function()
3232

3333

34-
class PoolResponse(BaseModel):
35-
"""Pool serializer for responses."""
34+
class BasePool(BaseModel):
35+
"""Base serializer for Pool."""
3636

37-
pool: str = Field(serialization_alias="name", validation_alias="pool")
37+
pool: str = Field(serialization_alias="name")
3838
slots: int
3939
description: str | None
4040
include_deferred: bool
4141

42+
43+
class PoolResponse(BasePool):
44+
"""Pool serializer for responses."""
45+
4246
occupied_slots: Annotated[int, BeforeValidator(_call_function)]
4347
running_slots: Annotated[int, BeforeValidator(_call_function)]
4448
queued_slots: Annotated[int, BeforeValidator(_call_function)]
@@ -52,3 +56,14 @@ class PoolCollectionResponse(BaseModel):
5256

5357
pools: list[PoolResponse]
5458
total_entries: int
59+
60+
61+
class PoolBody(BaseModel):
62+
"""Pool serializer for bodies."""
63+
64+
model_config = ConfigDict(populate_by_name=True)
65+
66+
name: str | None = Field(default=None, alias="pool")
67+
slots: int | None = None
68+
description: str | None = None
69+
include_deferred: bool | None = None

Diff for: airflow/ui/openapi-gen/queries/common.ts

+3
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,9 @@ export type DagServicePatchDagMutationResult = Awaited<
357357
export type VariableServicePatchVariableMutationResult = Awaited<
358358
ReturnType<typeof VariableService.patchVariable>
359359
>;
360+
export type PoolServicePatchPoolMutationResult = Awaited<
361+
ReturnType<typeof PoolService.patchPool>
362+
>;
360363
export type DagServiceDeleteDagMutationResult = Awaited<
361364
ReturnType<typeof DagService.deleteDag>
362365
>;

Diff for: airflow/ui/openapi-gen/queries/queries.ts

+53-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ import {
1818
ProviderService,
1919
VariableService,
2020
} from "../requests/services.gen";
21-
import { DAGPatchBody, DagRunState, VariableBody } from "../requests/types.gen";
21+
import {
22+
DAGPatchBody,
23+
DagRunState,
24+
PoolBody,
25+
VariableBody,
26+
} from "../requests/types.gen";
2227
import * as Common from "./common";
2328

2429
/**
@@ -776,6 +781,53 @@ export const useVariableServicePatchVariable = <
776781
}) as unknown as Promise<TData>,
777782
...options,
778783
});
784+
/**
785+
* Patch Pool
786+
* Update a Pool.
787+
* @param data The data for the request.
788+
* @param data.poolName
789+
* @param data.requestBody
790+
* @param data.updateMask
791+
* @returns PoolResponse Successful Response
792+
* @throws ApiError
793+
*/
794+
export const usePoolServicePatchPool = <
795+
TData = Common.PoolServicePatchPoolMutationResult,
796+
TError = unknown,
797+
TContext = unknown,
798+
>(
799+
options?: Omit<
800+
UseMutationOptions<
801+
TData,
802+
TError,
803+
{
804+
poolName: string;
805+
requestBody: PoolBody;
806+
updateMask?: string[];
807+
},
808+
TContext
809+
>,
810+
"mutationFn"
811+
>,
812+
) =>
813+
useMutation<
814+
TData,
815+
TError,
816+
{
817+
poolName: string;
818+
requestBody: PoolBody;
819+
updateMask?: string[];
820+
},
821+
TContext
822+
>({
823+
mutationFn: ({ poolName, requestBody, updateMask }) =>
824+
PoolService.patchPool({
825+
poolName,
826+
requestBody,
827+
updateMask,
828+
}) as unknown as Promise<TData>,
829+
...options,
830+
});
779831
/**
780832
* Delete Dag
781833
* Delete the specific DAG.

Diff for: airflow/ui/openapi-gen/requests/schemas.gen.ts

+52
Original file line numberDiff line numberDiff line change
@@ -1436,6 +1436,58 @@ export const $PluginResponse = {
14361436
description: "Plugin serializer.",
14371437
} as const;
14381438

1439+
export const $PoolBody = {
1440+
properties: {
1441+
pool: {
1442+
anyOf: [
1443+
{
1444+
type: "string",
1445+
},
1446+
{
1447+
type: "null",
1448+
},
1449+
],
1450+
title: "Pool",
1451+
},
1452+
slots: {
1453+
anyOf: [
1454+
{
1455+
type: "integer",
1456+
},
1457+
{
1458+
type: "null",
1459+
},
1460+
],
1461+
title: "Slots",
1462+
},
1463+
description: {
1464+
anyOf: [
1465+
{
1466+
type: "string",
1467+
},
1468+
{
1469+
type: "null",
1470+
},
1471+
],
1472+
title: "Description",
1473+
},
1474+
include_deferred: {
1475+
anyOf: [
1476+
{
1477+
type: "boolean",
1478+
},
1479+
{
1480+
type: "null",
1481+
},
1482+
],
1483+
title: "Include Deferred",
1484+
},
1485+
},
1486+
type: "object",
1487+
title: "PoolBody",
1488+
description: "Pool serializer for bodies.",
1489+
} as const;
1490+
14391491
export const $PoolCollectionResponse = {
14401492
properties: {
14411493
pools: {

0 commit comments

Comments
 (0)