Skip to content

json patch #4890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 80 additions & 158 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ twine = ">=4.0.0,<7.0"
tomlkit = ">=0.12.4,<1.0"
lazy_loader = ">=0.4"
typing_extensions = ">=4.6.0"
jsonpatch = "^1.33"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.1.2,<9.0"
Expand Down
47 changes: 29 additions & 18 deletions reflex/.templates/web/utils/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "$/utils/context.js";
import debounce from "$/utils/helpers/debounce";
import throttle from "$/utils/helpers/throttle";
import { applyPatch } from "fast-json-patch/index.mjs";

// Endpoint URLs.
const EVENTURL = env.EVENT;
Expand Down Expand Up @@ -227,8 +228,8 @@ export const applyEvent = async (event, socket) => {
a.href = eval?.(
event.payload.url.replace(
"getBackendURL(env.UPLOAD)",
`"${getBackendURL(env.UPLOAD)}"`,
),
`"${getBackendURL(env.UPLOAD)}"`
)
);
}
a.download = event.payload.filename;
Expand Down Expand Up @@ -341,7 +342,7 @@ export const applyRestEvent = async (event, socket) => {
event.payload.files,
event.payload.upload_id,
event.payload.on_upload_progress,
socket,
socket
);
return false;
}
Expand Down Expand Up @@ -408,7 +409,7 @@ export const connect = async (
dispatch,
transports,
setConnectErrors,
client_storage = {},
client_storage = {}
) => {
// Get backend URL object from the endpoint.
const endpoint = getBackendURL(EVENTURL);
Expand Down Expand Up @@ -464,10 +465,20 @@ export const connect = async (
window.removeEventListener("pagehide", pagehideHandler);
});

const last_substate_info = {};

// On each received message, queue the updates and events.
socket.current.on("event", async (update) => {
for (const substate in update.delta) {
dispatch[substate](update.delta[substate]);
console.log(last_substate_info[substate]);
const new_substate_info = update.delta[substate].__patch
? applyPatch(
last_substate_info[substate],
update.delta[substate].__patch
).newDocument
: update.delta[substate];
last_substate_info[substate] = new_substate_info;
dispatch[substate](new_substate_info);
}
applyClientStorageDelta(client_storage, update.delta);
event_processing = !update.final;
Expand Down Expand Up @@ -499,7 +510,7 @@ export const uploadFiles = async (
files,
upload_id,
on_upload_progress,
socket,
socket
) => {
// return if there's no file to upload
if (files === undefined || files.length === 0) {
Expand Down Expand Up @@ -604,7 +615,7 @@ export const Event = (
name,
payload = {},
event_actions = {},
handler = null,
handler = null
) => {
return { name, payload, handler, event_actions };
};
Expand All @@ -631,7 +642,7 @@ export const hydrateClientStorage = (client_storage) => {
for (const state_key in client_storage.local_storage) {
const options = client_storage.local_storage[state_key];
const local_storage_value = localStorage.getItem(
options.name || state_key,
options.name || state_key
);
if (local_storage_value !== null) {
client_storage_values[state_key] = local_storage_value;
Expand All @@ -642,7 +653,7 @@ export const hydrateClientStorage = (client_storage) => {
for (const state_key in client_storage.session_storage) {
const session_options = client_storage.session_storage[state_key];
const session_storage_value = sessionStorage.getItem(
session_options.name || state_key,
session_options.name || state_key
);
if (session_storage_value != null) {
client_storage_values[state_key] = session_storage_value;
Expand All @@ -667,7 +678,7 @@ export const hydrateClientStorage = (client_storage) => {
const applyClientStorageDelta = (client_storage, delta) => {
// find the main state and check for is_hydrated
const unqualified_states = Object.keys(delta).filter(
(key) => key.split(".").length === 1,
(key) => key.split(".").length === 1
);
if (unqualified_states.length === 1) {
const main_state = delta[unqualified_states[0]];
Expand Down Expand Up @@ -701,7 +712,7 @@ const applyClientStorageDelta = (client_storage, delta) => {
const session_options = client_storage.session_storage[state_key];
sessionStorage.setItem(
session_options.name || state_key,
delta[substate][key],
delta[substate][key]
);
}
}
Expand All @@ -721,7 +732,7 @@ const applyClientStorageDelta = (client_storage, delta) => {
export const useEventLoop = (
dispatch,
initial_events = () => [],
client_storage = {},
client_storage = {}
) => {
const socket = useRef(null);
const router = useRouter();
Expand All @@ -735,7 +746,7 @@ export const useEventLoop = (

event_actions = events.reduce(
(acc, e) => ({ ...acc, ...e.event_actions }),
event_actions ?? {},
event_actions ?? {}
);

const _e = args.filter((o) => o?.preventDefault !== undefined)[0];
Expand Down Expand Up @@ -763,7 +774,7 @@ export const useEventLoop = (
debounce(
combined_name,
() => queueEvents(events, socket),
event_actions.debounce,
event_actions.debounce
);
} else {
queueEvents(events, socket);
Expand All @@ -782,7 +793,7 @@ export const useEventLoop = (
query,
asPath,
}))(router),
})),
}))
);
sentHydrate.current = true;
}
Expand Down Expand Up @@ -828,7 +839,7 @@ export const useEventLoop = (
dispatch,
["websocket"],
setConnectErrors,
client_storage,
client_storage
);
}
}
Expand Down Expand Up @@ -876,7 +887,7 @@ export const useEventLoop = (
vars[storage_to_state_map[e.key]] = e.newValue;
const event = Event(
`${state_name}.reflex___state____update_vars_internal_state.update_vars_internal`,
{ vars: vars },
{ vars: vars }
);
addEvents([event], e);
}
Expand Down Expand Up @@ -969,7 +980,7 @@ export const getRefValues = (refs) => {
return refs.map((ref) =>
ref.current
? ref.current.value || ref.current.getAttribute("aria-valuenow")
: null,
: null
);
};

Expand Down
2 changes: 1 addition & 1 deletion reflex/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ async def modify_state(self, token: str) -> AsyncIterator[BaseState]:
async with self.state_manager.modify_state(token) as state:
# No other event handler can modify the state while in this context.
yield state
delta = state.get_delta()
delta = state.get_delta(token=token)
if delta:
# When the state is modified reset dirty status and emit the delta to the frontend.
state._clean()
Expand Down
8 changes: 4 additions & 4 deletions reflex/compiler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from reflex.components.component import Component, ComponentStyle, CustomComponent
from reflex.istate.storage import Cookie, LocalStorage, SessionStorage
from reflex.state import BaseState, _resolve_delta
from reflex.state import BaseState, StateDelta, _resolve_delta
from reflex.style import Style
from reflex.utils import console, format, imports, path_ops
from reflex.utils.exec import is_in_app_harness
Expand Down Expand Up @@ -187,7 +187,7 @@ def compile_state(state: Type[BaseState]) -> dict:
Returns:
A dictionary of the compiled state.
"""
initial_state = state(_reflex_internal_init=True).dict(initial=True)
initial_state = StateDelta(state(_reflex_internal_init=True).dict(initial=True))
try:
_ = asyncio.get_running_loop()
except RuntimeError:
Expand All @@ -202,10 +202,10 @@ def compile_state(state: Type[BaseState]) -> dict:
console.warn(
f"Had to get initial state in a thread 🤮 {resolved_initial_state}",
)
return resolved_initial_state
return resolved_initial_state.data

# Normally the compile runs before any event loop starts, we asyncio.run is available for calling.
return asyncio.run(_resolve_delta(initial_state))
return asyncio.run(_resolve_delta(initial_state)).data


def _compile_client_storage_field(
Expand Down
3 changes: 3 additions & 0 deletions reflex/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,9 @@ class EnvironmentVariables:
# Used by flexgen to enumerate the pages.
REFLEX_ADD_ALL_ROUTES_ENDPOINT: EnvVar[bool] = env_var(False)

# Use the JSON patch format for websocket messages.
REFLEX_USE_JSON_PATCH: EnvVar[bool] = env_var(False)


environment = EnvironmentVariables()

Expand Down
1 change: 1 addition & 0 deletions reflex/constants/installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class Commands(SimpleNamespace):
"react-dom": "19.0.0",
"react-focus-lock": "2.13.6",
"socket.io-client": "4.8.1",
"fast-json-patch": "3.1.1",
"universal-cookie": "7.2.2",
}
DEV_DEPENDENCIES = {
Expand Down
10 changes: 8 additions & 2 deletions reflex/middleware/hydrate_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from reflex import constants
from reflex.event import Event, get_hydrate_event
from reflex.middleware.middleware import Middleware
from reflex.state import BaseState, StateUpdate, _resolve_delta
from reflex.state import BaseState, StateDelta, StateUpdate, _resolve_delta

if TYPE_CHECKING:
from reflex.app import App
Expand Down Expand Up @@ -42,7 +42,13 @@ async def preprocess(
setattr(state, constants.CompileVars.IS_HYDRATED, False)

# Get the initial state.
delta = await _resolve_delta(state.dict())
delta = await _resolve_delta(
StateDelta(
state.dict(),
reflex_delta_token=state.router.session.client_token,
flush=True,
)
)
# since a full dict was captured, clean any dirtiness
state._clean()

Expand Down
Loading