Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions openc3/python/openc3/utilities/questdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,15 @@ def format_timestamp(utc_time, format_type):
return None

# Maps SQL type strings used in CREATE TABLE to canonical type names returned by SHOW COLUMNS
QUESTDB_TYPE_MAP = {
"float": "FLOAT",
"double": "DOUBLE",
"int": "INT",
"long": "LONG",
"varchar": "VARCHAR",
"DECIMAL(20, 0)": "DECIMAL",
"timestamp_ns": "TIMESTAMP_NS",
"SYMBOL": "SYMBOL",
}
@staticmethod
def _canonical_type(sql_type):
"""Normalize a SQL type string for comparison.

Uppercases and strips internal whitespace so that e.g.
'DECIMAL(20, 0)' and 'DECIMAL(20,0)' compare equal,
while preserving parameters so DECIMAL(20,0) != DECIMAL(22,0).
"""
return re.sub(r"\s+", "", sql_type.upper())

def _get_existing_columns(self, table_name):
"""Query QuestDB for existing column names and types.
Expand Down Expand Up @@ -584,13 +583,15 @@ def create_table(self, target_name, packet_name, packet, cmd_or_tlm="TLM", retai
rc = item.get("read_conversion")
converted_type = rc.get("converted_type") if rc else None
converted_bit_size = rc.get("converted_bit_size", 0) if rc else 0
col_type, _ = self._get_column_type_from_conversion(
col_type, needs_json = self._get_column_type_from_conversion(
table_name,
f"{item_name}__C",
converted_type,
converted_bit_size,
)
desired_columns[f"{item_name}__C"] = col_type
if needs_json:
self.json_columns[f"{table_name}__{item_name}__C"] = True

if item.get("format_string") or item.get("units"):
desired_columns[f"{item_name}__F"] = "varchar"
Expand All @@ -604,8 +605,9 @@ def create_table(self, target_name, packet_name, packet, cmd_or_tlm="TLM", retai
try:
with self.query.cursor() as cur:
for col_name, desired_sql_type in desired_columns.items():
desired_canonical = self.QUESTDB_TYPE_MAP.get(desired_sql_type, desired_sql_type.upper())
existing_type = existing_columns.get(col_name)
desired_canonical = self._canonical_type(desired_sql_type)
existing_raw = existing_columns.get(col_name)
existing_type = self._canonical_type(existing_raw) if existing_raw else None

if existing_type is None:
# Column doesn't exist yet — add it
Expand Down Expand Up @@ -991,6 +993,9 @@ def handle_ingress_error(self, error, table_name, columns, timestamp_ns):
casted = self._cast_value_to_column_type(value, column_type)
if casted is not None:
columns[column_name] = casted
# Persist tracking so convert_value() handles future rows
if column_type in ("VARCHAR", "STRING"):
self.varchar_columns[f"{err_table_name}__{column_name}"] = True
self._log_warn(
f"QuestDB: Column {column_name} in table {err_table_name} "
f"expected {column_type} but received {protocol_type}. "
Expand Down
186 changes: 186 additions & 0 deletions openc3/python/test/microservices/test_tsdb_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,192 @@ def xread_side_effect(*args, **kwargs):
self.assertIn("COSMOS_EXTRA", columns)
self.assertEqual(columns["COSMOS_EXTRA"], json.dumps(extra_data))

def test_canonical_type_normalizes_case_and_whitespace(self):
"""Test _canonical_type uppercases and strips whitespace for consistent comparison"""
from openc3.utilities.questdb_client import QuestDBClient

# Lowercase to uppercase
self.assertEqual(QuestDBClient._canonical_type("float"), "FLOAT")
self.assertEqual(QuestDBClient._canonical_type("double"), "DOUBLE")
self.assertEqual(QuestDBClient._canonical_type("int"), "INT")
self.assertEqual(QuestDBClient._canonical_type("long"), "LONG")
self.assertEqual(QuestDBClient._canonical_type("varchar"), "VARCHAR")
self.assertEqual(QuestDBClient._canonical_type("SYMBOL"), "SYMBOL")
self.assertEqual(QuestDBClient._canonical_type("timestamp_ns"), "TIMESTAMP_NS")

# DECIMAL with different whitespace normalizes to the same string
self.assertEqual(QuestDBClient._canonical_type("DECIMAL(20, 0)"), "DECIMAL(20,0)")
self.assertEqual(QuestDBClient._canonical_type("DECIMAL(20,0)"), "DECIMAL(20,0)")

# Different DECIMAL parameters remain distinct
self.assertNotEqual(
QuestDBClient._canonical_type("DECIMAL(20, 0)"),
QuestDBClient._canonical_type("DECIMAL(22, 0)"),
)

@patch("openc3.microservices.tsdb_microservice.get_tlm")
@patch("openc3.utilities.questdb_client.Sender")
@patch("openc3.utilities.questdb_client.psycopg.connect")
@patch("openc3.microservices.microservice.System")
def test_create_table_conversion_without_converted_type_uses_json(
self, mock_system, mock_psycopg, mock_sender, mock_get_tlm
):
"""Test that a read_conversion with no converted_type creates a varchar __C column registered for JSON"""
mock_query = Mock()
mock_psycopg.return_value = mock_query
mock_cursor = Mock()
mock_query.cursor.return_value.__enter__ = Mock(return_value=mock_cursor)
mock_query.cursor.return_value.__exit__ = Mock(return_value=False)

# Simulates a user-defined Python conversion (like twos_comp_conversion.py)
# that does not declare converted_type
mock_get_tlm.return_value = {
"items": [
{
"name": "ANGLE",
"data_type": "INT",
"bit_size": 16,
"read_conversion": {
"converted_type": None,
"converted_bit_size": 0,
},
},
]
}

model = MicroserviceModel(
"DEFAULT__TSDB__TEST",
scope="DEFAULT",
topics=["DEFAULT__DECOM__{TEST}__PKT"],
target_names=["TEST"],
)
model.create()

tsdb = TsdbMicroservice("DEFAULT__TSDB__TEST")

create_table_calls = [call for call in mock_cursor.execute.call_args_list if "CREATE TABLE" in str(call)]
self.assertTrue(len(create_table_calls) > 0)
create_table_sql = str(create_table_calls[0])

# Raw column should be int, converted column should be varchar
self.assertIn('"ANGLE" int', create_table_sql)
self.assertIn('"ANGLE__C" varchar', create_table_sql)

# The __C column should be registered for JSON serialization so integer values get stringified
self.assertIn("TLM__TEST__PKT__ANGLE__C", tsdb.questdb.json_columns)

@patch("openc3.microservices.tsdb_microservice.get_tlm")
@patch("openc3.utilities.questdb_client.Sender")
@patch("openc3.utilities.questdb_client.psycopg.connect")
@patch("openc3.microservices.microservice.System")
def test_reconcile_skips_alter_when_decimal_types_match(
self, mock_system, mock_psycopg, mock_sender, mock_get_tlm
):
"""Test that reconciliation does not ALTER when existing DECIMAL(20,0) matches desired DECIMAL(20, 0)"""
mock_query = Mock()
mock_psycopg.return_value = mock_query
mock_cursor = Mock()
mock_query.cursor.return_value.__enter__ = Mock(return_value=mock_cursor)
mock_query.cursor.return_value.__exit__ = Mock(return_value=False)

# Simulate an existing table with DECIMAL(20,0) (no space, as QuestDB returns)
mock_cursor.fetchall.return_value = [
("PACKET_TIMESECONDS", "TIMESTAMP_NS"),
("RECEIVED_TIMESECONDS", "TIMESTAMP_NS"),
("RECEIVED_COUNT", "LONG"),
("COSMOS_DATA_TAG", "SYMBOL"),
("BIGVAL", "DECIMAL(20,0)"),
]

mock_get_tlm.return_value = {
"items": [
{
"name": "BIGVAL",
"data_type": "UINT",
"bit_size": 64,
},
]
}

model = MicroserviceModel(
"DEFAULT__TSDB__TEST",
scope="DEFAULT",
topics=["DEFAULT__DECOM__{TEST}__PKT"],
target_names=["TEST"],
)
model.create()

TsdbMicroservice("DEFAULT__TSDB__TEST")

# Should NOT have issued any ALTER COLUMN TYPE statements
alter_calls = [
call for call in mock_cursor.execute.call_args_list if "ALTER" in str(call) and "TYPE" in str(call)
]
self.assertEqual(len(alter_calls), 0, f"Should not ALTER matching DECIMAL types, but got: {alter_calls}")

@patch("openc3.utilities.questdb_client.Sender")
@patch("openc3.utilities.questdb_client.psycopg.connect")
@patch("openc3.microservices.microservice.System")
def test_handle_ingress_error_persists_varchar_tracking(self, mock_system, mock_psycopg, mock_sender):
"""Test that handle_ingress_error registers VARCHAR columns so future rows don't re-error"""
mock_ingest = Mock()
mock_sender.return_value = mock_ingest
mock_query = Mock()
mock_psycopg.return_value = mock_query
mock_cursor = Mock()
mock_query.cursor.return_value.__enter__ = Mock(return_value=mock_cursor)
mock_query.cursor.return_value.__exit__ = Mock(return_value=False)

orig_xread = self.redis.xread

def xread_side_effect(*args, **kwargs):
if "block" in kwargs:
kwargs.pop("block")
return orig_xread(*args, **kwargs)

self.redis.xread = Mock(side_effect=xread_side_effect)

model = MicroserviceModel(
"DEFAULT__TSDB__TEST",
scope="DEFAULT",
topics=["DEFAULT__DECOM__{INST}__HEALTH_STATUS"],
target_names=["INST"],
)
model.create()

tsdb = TsdbMicroservice("DEFAULT__TSDB__TEST")

# Simulate INTEGER to VARCHAR cast error (the reported user issue)
error_msg = (
"error in line 1: table: TLM__INST__HEALTH_STATUS, column: ITEM__C; "
'cast error from protocol type: INTEGER to column type: VARCHAR","line":1'
)
mock_ingest.row.side_effect = [IngressError(1, error_msg), None]

json_data = {"ITEM__C": 42}
Topic.write_topic(
"DEFAULT__DECOM__{INST}__HEALTH_STATUS",
{
b"target_name": b"INST",
b"packet_name": b"HEALTH_STATUS",
b"time": str(int(time.time() * 1_000_000_000)).encode(),
b"json_data": json.dumps(json_data).encode(),
},
"*",
100,
)

for stdout in capture_io():
tsdb.read_topics()
self.assertIn("expected VARCHAR but received INTEGER", stdout.getvalue())

# The column should now be tracked in varchar_columns so convert_value() will str() it
self.assertIn("TLM__INST__HEALTH_STATUS__ITEM__C", tsdb.questdb.varchar_columns)

# Verify retry was called with the value cast to string
retry_call = mock_ingest.row.call_args_list[1]
self.assertEqual(retry_call[1]["columns"]["ITEM__C"], "42")

@patch("openc3.utilities.questdb_client.Sender")
@patch("openc3.utilities.questdb_client.psycopg.connect")
@patch("openc3.microservices.microservice.System")
Expand Down
Loading