From 670871d6524104bcd30ee366dd567019897cdfdf Mon Sep 17 00:00:00 2001 From: Jason Thomas Date: Wed, 18 Feb 2026 16:53:10 -0700 Subject: [PATCH] Fix QuestDB ingress errors for Python conversions without converted_type Three bugs fixed: - __C columns from read_conversions with no converted_type (e.g. user Python conversions) were created as VARCHAR but not tracked in json_columns, causing INTEGER-to-VARCHAR cast errors on every row - Table reconciliation on plugin reload issued unnecessary ALTER for DECIMAL columns due to whitespace mismatch ("DECIMAL(20, 0)" vs "DECIMAL(20,0)") - handle_ingress_error cast values for VARCHAR columns but didn't persist the tracking, so the same error fired on every subsequent row Replaced QUESTDB_TYPE_MAP dict with _canonical_type() that normalizes via uppercase + whitespace stripping, preserving parameterized type identity. Co-Authored-By: Claude Opus 4.6 --- .../python/openc3/utilities/questdb_client.py | 31 +-- .../microservices/test_tsdb_microservice.py | 186 ++++++++++++++++++ 2 files changed, 204 insertions(+), 13 deletions(-) diff --git a/openc3/python/openc3/utilities/questdb_client.py b/openc3/python/openc3/utilities/questdb_client.py index 2c96efec17..26e1b867e2 100644 --- a/openc3/python/openc3/utilities/questdb_client.py +++ b/openc3/python/openc3/utilities/questdb_client.py @@ -466,16 +466,15 @@ def format_timestamp(utc_time, format_type): return None # Maps SQL type strings used in CREATE TABLE to canonical type names returned by SHOW COLUMNS - QUESTDB_TYPE_MAP = { - "float": "FLOAT", - "double": "DOUBLE", - "int": "INT", - "long": "LONG", - "varchar": "VARCHAR", - "DECIMAL(20, 0)": "DECIMAL", - "timestamp_ns": "TIMESTAMP_NS", - "SYMBOL": "SYMBOL", - } + @staticmethod + def _canonical_type(sql_type): + """Normalize a SQL type string for comparison. + + Uppercases and strips internal whitespace so that e.g. + 'DECIMAL(20, 0)' and 'DECIMAL(20,0)' compare equal, + while preserving parameters so DECIMAL(20,0) != DECIMAL(22,0). + """ + return re.sub(r"\s+", "", sql_type.upper()) def _get_existing_columns(self, table_name): """Query QuestDB for existing column names and types. @@ -584,13 +583,15 @@ def create_table(self, target_name, packet_name, packet, cmd_or_tlm="TLM", retai rc = item.get("read_conversion") converted_type = rc.get("converted_type") if rc else None converted_bit_size = rc.get("converted_bit_size", 0) if rc else 0 - col_type, _ = self._get_column_type_from_conversion( + col_type, needs_json = self._get_column_type_from_conversion( table_name, f"{item_name}__C", converted_type, converted_bit_size, ) desired_columns[f"{item_name}__C"] = col_type + if needs_json: + self.json_columns[f"{table_name}__{item_name}__C"] = True if item.get("format_string") or item.get("units"): desired_columns[f"{item_name}__F"] = "varchar" @@ -604,8 +605,9 @@ def create_table(self, target_name, packet_name, packet, cmd_or_tlm="TLM", retai try: with self.query.cursor() as cur: for col_name, desired_sql_type in desired_columns.items(): - desired_canonical = self.QUESTDB_TYPE_MAP.get(desired_sql_type, desired_sql_type.upper()) - existing_type = existing_columns.get(col_name) + desired_canonical = self._canonical_type(desired_sql_type) + existing_raw = existing_columns.get(col_name) + existing_type = self._canonical_type(existing_raw) if existing_raw else None if existing_type is None: # Column doesn't exist yet — add it @@ -991,6 +993,9 @@ def handle_ingress_error(self, error, table_name, columns, timestamp_ns): casted = self._cast_value_to_column_type(value, column_type) if casted is not None: columns[column_name] = casted + # Persist tracking so convert_value() handles future rows + if column_type in ("VARCHAR", "STRING"): + self.varchar_columns[f"{err_table_name}__{column_name}"] = True self._log_warn( f"QuestDB: Column {column_name} in table {err_table_name} " f"expected {column_type} but received {protocol_type}. " diff --git a/openc3/python/test/microservices/test_tsdb_microservice.py b/openc3/python/test/microservices/test_tsdb_microservice.py index ad6498d4d9..f791a1e0e0 100644 --- a/openc3/python/test/microservices/test_tsdb_microservice.py +++ b/openc3/python/test/microservices/test_tsdb_microservice.py @@ -1636,6 +1636,192 @@ def xread_side_effect(*args, **kwargs): self.assertIn("COSMOS_EXTRA", columns) self.assertEqual(columns["COSMOS_EXTRA"], json.dumps(extra_data)) + def test_canonical_type_normalizes_case_and_whitespace(self): + """Test _canonical_type uppercases and strips whitespace for consistent comparison""" + from openc3.utilities.questdb_client import QuestDBClient + + # Lowercase to uppercase + self.assertEqual(QuestDBClient._canonical_type("float"), "FLOAT") + self.assertEqual(QuestDBClient._canonical_type("double"), "DOUBLE") + self.assertEqual(QuestDBClient._canonical_type("int"), "INT") + self.assertEqual(QuestDBClient._canonical_type("long"), "LONG") + self.assertEqual(QuestDBClient._canonical_type("varchar"), "VARCHAR") + self.assertEqual(QuestDBClient._canonical_type("SYMBOL"), "SYMBOL") + self.assertEqual(QuestDBClient._canonical_type("timestamp_ns"), "TIMESTAMP_NS") + + # DECIMAL with different whitespace normalizes to the same string + self.assertEqual(QuestDBClient._canonical_type("DECIMAL(20, 0)"), "DECIMAL(20,0)") + self.assertEqual(QuestDBClient._canonical_type("DECIMAL(20,0)"), "DECIMAL(20,0)") + + # Different DECIMAL parameters remain distinct + self.assertNotEqual( + QuestDBClient._canonical_type("DECIMAL(20, 0)"), + QuestDBClient._canonical_type("DECIMAL(22, 0)"), + ) + + @patch("openc3.microservices.tsdb_microservice.get_tlm") + @patch("openc3.utilities.questdb_client.Sender") + @patch("openc3.utilities.questdb_client.psycopg.connect") + @patch("openc3.microservices.microservice.System") + def test_create_table_conversion_without_converted_type_uses_json( + self, mock_system, mock_psycopg, mock_sender, mock_get_tlm + ): + """Test that a read_conversion with no converted_type creates a varchar __C column registered for JSON""" + mock_query = Mock() + mock_psycopg.return_value = mock_query + mock_cursor = Mock() + mock_query.cursor.return_value.__enter__ = Mock(return_value=mock_cursor) + mock_query.cursor.return_value.__exit__ = Mock(return_value=False) + + # Simulates a user-defined Python conversion (like twos_comp_conversion.py) + # that does not declare converted_type + mock_get_tlm.return_value = { + "items": [ + { + "name": "ANGLE", + "data_type": "INT", + "bit_size": 16, + "read_conversion": { + "converted_type": None, + "converted_bit_size": 0, + }, + }, + ] + } + + model = MicroserviceModel( + "DEFAULT__TSDB__TEST", + scope="DEFAULT", + topics=["DEFAULT__DECOM__{TEST}__PKT"], + target_names=["TEST"], + ) + model.create() + + tsdb = TsdbMicroservice("DEFAULT__TSDB__TEST") + + create_table_calls = [call for call in mock_cursor.execute.call_args_list if "CREATE TABLE" in str(call)] + self.assertTrue(len(create_table_calls) > 0) + create_table_sql = str(create_table_calls[0]) + + # Raw column should be int, converted column should be varchar + self.assertIn('"ANGLE" int', create_table_sql) + self.assertIn('"ANGLE__C" varchar', create_table_sql) + + # The __C column should be registered for JSON serialization so integer values get stringified + self.assertIn("TLM__TEST__PKT__ANGLE__C", tsdb.questdb.json_columns) + + @patch("openc3.microservices.tsdb_microservice.get_tlm") + @patch("openc3.utilities.questdb_client.Sender") + @patch("openc3.utilities.questdb_client.psycopg.connect") + @patch("openc3.microservices.microservice.System") + def test_reconcile_skips_alter_when_decimal_types_match( + self, mock_system, mock_psycopg, mock_sender, mock_get_tlm + ): + """Test that reconciliation does not ALTER when existing DECIMAL(20,0) matches desired DECIMAL(20, 0)""" + mock_query = Mock() + mock_psycopg.return_value = mock_query + mock_cursor = Mock() + mock_query.cursor.return_value.__enter__ = Mock(return_value=mock_cursor) + mock_query.cursor.return_value.__exit__ = Mock(return_value=False) + + # Simulate an existing table with DECIMAL(20,0) (no space, as QuestDB returns) + mock_cursor.fetchall.return_value = [ + ("PACKET_TIMESECONDS", "TIMESTAMP_NS"), + ("RECEIVED_TIMESECONDS", "TIMESTAMP_NS"), + ("RECEIVED_COUNT", "LONG"), + ("COSMOS_DATA_TAG", "SYMBOL"), + ("BIGVAL", "DECIMAL(20,0)"), + ] + + mock_get_tlm.return_value = { + "items": [ + { + "name": "BIGVAL", + "data_type": "UINT", + "bit_size": 64, + }, + ] + } + + model = MicroserviceModel( + "DEFAULT__TSDB__TEST", + scope="DEFAULT", + topics=["DEFAULT__DECOM__{TEST}__PKT"], + target_names=["TEST"], + ) + model.create() + + TsdbMicroservice("DEFAULT__TSDB__TEST") + + # Should NOT have issued any ALTER COLUMN TYPE statements + alter_calls = [ + call for call in mock_cursor.execute.call_args_list if "ALTER" in str(call) and "TYPE" in str(call) + ] + self.assertEqual(len(alter_calls), 0, f"Should not ALTER matching DECIMAL types, but got: {alter_calls}") + + @patch("openc3.utilities.questdb_client.Sender") + @patch("openc3.utilities.questdb_client.psycopg.connect") + @patch("openc3.microservices.microservice.System") + def test_handle_ingress_error_persists_varchar_tracking(self, mock_system, mock_psycopg, mock_sender): + """Test that handle_ingress_error registers VARCHAR columns so future rows don't re-error""" + mock_ingest = Mock() + mock_sender.return_value = mock_ingest + mock_query = Mock() + mock_psycopg.return_value = mock_query + mock_cursor = Mock() + mock_query.cursor.return_value.__enter__ = Mock(return_value=mock_cursor) + mock_query.cursor.return_value.__exit__ = Mock(return_value=False) + + orig_xread = self.redis.xread + + def xread_side_effect(*args, **kwargs): + if "block" in kwargs: + kwargs.pop("block") + return orig_xread(*args, **kwargs) + + self.redis.xread = Mock(side_effect=xread_side_effect) + + model = MicroserviceModel( + "DEFAULT__TSDB__TEST", + scope="DEFAULT", + topics=["DEFAULT__DECOM__{INST}__HEALTH_STATUS"], + target_names=["INST"], + ) + model.create() + + tsdb = TsdbMicroservice("DEFAULT__TSDB__TEST") + + # Simulate INTEGER to VARCHAR cast error (the reported user issue) + error_msg = ( + "error in line 1: table: TLM__INST__HEALTH_STATUS, column: ITEM__C; " + 'cast error from protocol type: INTEGER to column type: VARCHAR","line":1' + ) + mock_ingest.row.side_effect = [IngressError(1, error_msg), None] + + json_data = {"ITEM__C": 42} + Topic.write_topic( + "DEFAULT__DECOM__{INST}__HEALTH_STATUS", + { + b"target_name": b"INST", + b"packet_name": b"HEALTH_STATUS", + b"time": str(int(time.time() * 1_000_000_000)).encode(), + b"json_data": json.dumps(json_data).encode(), + }, + "*", + 100, + ) + + for stdout in capture_io(): + tsdb.read_topics() + self.assertIn("expected VARCHAR but received INTEGER", stdout.getvalue()) + + # The column should now be tracked in varchar_columns so convert_value() will str() it + self.assertIn("TLM__INST__HEALTH_STATUS__ITEM__C", tsdb.questdb.varchar_columns) + + # Verify retry was called with the value cast to string + retry_call = mock_ingest.row.call_args_list[1] + self.assertEqual(retry_call[1]["columns"]["ITEM__C"], "42") + @patch("openc3.utilities.questdb_client.Sender") @patch("openc3.utilities.questdb_client.psycopg.connect") @patch("openc3.microservices.microservice.System")