Skip to content

[Bug] Spark is unable to write to Doris hosted on K8s ( Compute Storage Decoupled mode ) #325

@Nj-kol

Description

@Nj-kol

Search before asking

  • I had searched in the issues and found no similar issues.

Version

25.1.0

What's Wrong?

I am running doris in k8s, and want to write to it using spark. getting the below exception

java.net.UnknownHostException: doris-disaggregated-cluster-cg1-0.doris-disaggregated-cluster-cg1.doris.svc.cluster.local
        at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:988)

I believe this is happening because Spark contacts FE to get a list of backend/compute nodes, and then once the IP/port of the compute nodes is attained, it directly contacts the compute groups.

The issue is in Kubernetes when compute groups spin up, they register themselves to FE nodes using their internal FQDN, which are not reachable outside.

mysql> show backends;
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
| BackendId     | Host                                                                                      | HeartbeatPort | BePort | HttpPort | BrpcPort | ArrowFlightSqlPort | LastStartTime       | LastHeartbeat       | Alive | SystemDecommissioned | TabletNum | DataUsedCapacity | TrashUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | RemoteUsedCapacity | Tag                                                                                                                                                                                                                      | ErrMsg | Version                     | Status                                                                                                                                                                                      | HeartbeatFailureCounter | NodeRole | CpuCores | Memory  |
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
| 1750574093851 | doris-disaggregated-cluster-cg1-0.doris-disaggregated-cluster-cg1.doris.svc.cluster.local | 9050          | 9060   | 8040     | 8060     | -1                 | 2025-06-22 06:35:21 | 2025-06-22 06:41:07 | true  | false                | 33        | 0.000            | 0.000             | 1.000 B       | 0.000         | 0.00 %  | 0.00 %         | 0.000              | {"cloud_unique_id" : "1:1030729153:dnDaSeQC", "compute_group_status" : "NORMAL", "private_endpoint" : "", "compute_group_name" : "cg1", "location" : "default", "public_endpoint" : "", "compute_group_id" : "xdReRUSR"} |        | doris-3.0.5-rc01-e277cfb83f | {"lastSuccessReportTabletsTime":"N/A","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false,"isActive":true,"currentFragmentNum":0,"lastFragmentUpdateTime":1750574458773} | 0                       | mix      | 32       | 4.00 GB |
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+

What You Expected?

Just like there is configuration for doris.fenodes and doris.query.port, there should be similar configuration for backend/compute nodes.

Then on the k8s side, backend/compute nodes can be exposed as ingress to solve this issue

How to Reproduce?

Create a table in Doris

CREATE TABLE IF NOT EXISTS demo.user_data (
    user_id INT,
    name STRING,
    age INT,
    update_time DATETIME
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
    "replication_num" = "1",
    "enable_unique_key_merge_on_write" = "true"
);

Launch Spark shell

spark-shell --packages org.apache.doris:spark-doris-connector-spark-3.5:25.1.0,mysql:mysql-connector-java:8.0.33 

Code :

import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.SaveMode

// Sample batch data
val batchData = Seq(
  (1, "Alice", 30, "2024-01-01 10:00:00"),
  (2, "Bob", 25, "2024-01-01 11:00:00")
)

val df = batchData.toDF("user_id", "name", "age", "update_time")

df.write.format("doris")
  .option("doris.table.identifier", "demo.user_data")
  .option("doris.fenodes", feNodes)
  .option("user", "root")
  .option("password", "")
  .option("doris.write.fields", "user_id,name,age,update_time")
  .mode(SaveMode.Overwrite)
  .save()

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions