Skip to content

Connector cannot insert null value into Nullable(JSON) type #562

@taras-biletskyi

Description

@taras-biletskyi

Describe the bug

I cannot seem to ingest null values into Nullable(JSON) column type. It does work if it is non-null value. It worked with Nullable(String) before too.

Steps to reproduce

  1. Create a table with Nullable(JSON) field.
    CREATE TABLE sandbox.cool_table_name (
        `before` Nullable(JSON)
    )
    ENGINE = MergeTree
    ORDER BY tuple()
  2. Try import null value for that column from Kafka into the table. E.g. Kafka message from MongoDB:
    {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "string",
                    "optional": true,
                    "name": "io.debezium.data.Json",
                    "version": 1,
                    "field": "before"
                }
                // other fields' schemas...
            ],
            "optional": false,
            "name": "cool_table_name.Envelope_Extended"
        },
        "payload": {
            "before": null
            // other fields...
        }
    }
  3. See an error: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because the return value of "com.clickhouse.kafka.connect.sink.data.Data.getObject()" is null

Error log

2025-07-24 14:26:51.191	2025-07-24 11:26:51 ERROR [task-thread-local-dev-prod-clickhouse-from-dev-mongo-4] ClickHouseWriter:745 - Error inserting records
2025-07-24 14:26:51.191	java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because the return value of "com.clickhouse.kafka.connect.sink.data.Data.getObject()" is null
2025-07-24 14:26:51.191		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteColValue(ClickHouseWriter.java:545) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.191		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:708) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.191		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:854) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.191		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:731) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:211) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:55) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:143) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:102) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:48) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
2025-07-24 14:26:51.192		at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2025-07-24 14:26:51.192	2025-07-24 11:26:51 WARN  [task-thread-local-dev-prod-clickhouse-from-dev-mongo-4] Utils:65 - Deciding how to handle exception: Topic: [cool_table_name], Partition: [0], MinOffset: [0], MaxOffset: [43], (QueryId: [ad640e61-37ed-47e7-b3ea-10b0ad501699])
2025-07-24 14:26:51.192	2025-07-24 11:26:51 ERROR [task-thread-local-dev-prod-clickhouse-from-dev-mongo-4] Utils:120 - Errors tolerance is disabled, wrapping exception: Topic: [cool_table_name], Partition: [0], MinOffset: [0], MaxOffset: [43], (QueryId: [ad640e61-37ed-47e7-b3ea-10b0ad501699])
2025-07-24 14:26:51.192	2025-07-24 11:26:51 ERROR [task-thread-local-dev-prod-clickhouse-from-dev-mongo-4] WorkerSinkTask:634 - WorkerSinkTask{id=local-dev-prod-clickhouse-from-dev-mongo-4} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Number of records: 44
2025-07-24 14:26:51.192	java.lang.RuntimeException: Number of records: 44
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:122) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:56) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) ~[connect-runtime-4.0.0.jar:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
2025-07-24 14:26:51.192		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
2025-07-24 14:26:51.192		at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2025-07-24 14:26:51.192	Caused by: java.lang.RuntimeException: Topic: [cool_table_name], Partition: [0], MinOffset: [0], MaxOffset: [43], (QueryId: [ad640e61-37ed-47e7-b3ea-10b0ad501699])
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:57) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:143) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:102) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:48) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		... 12 more
2025-07-24 14:26:51.192	Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because the return value of "com.clickhouse.kafka.connect.sink.data.Data.getObject()" is null
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteColValue(ClickHouseWriter.java:545) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:708) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:854) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:731) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:211) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:55) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:143) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:102) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:48) ~[clickhouse-kafka-connect-v1.3.2-confluent.jar:v1.3.2]
2025-07-24 14:26:51.192		... 12 more

Configuration

Environment

  • Kafka version: Strimzi kafka:0.46.1-kafka-4.0.0
  • Kafka Connect configuration:
      config:
        hostname: ...
        port: 8443
        ssl: true
        username: ...
        password: ...
        database: sandbox
        topics: >-
          cool_table_name
        clickhouseSettings: "input_format_binary_read_json_as_string=1"
        value.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable: true
        exectlyOnce: "true"
        consumer.override.max.poll.records: 50000
        consumer.override.max.partition.fetch.bytes: 104857600
  • Env: k8s Strimzi Operator on k3d cluster

ClickHouse server

  • ClickHouse Server version: 25.4.1.37460.
  • ClickHouse Server non-default settings, if any: user settings like SETTINGS input_format_binary_read_json_as_string = true, enable_json_type = true
  • Clickhouse Kafka Connect plugin version: 1.3.2

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions