Skip to content

Commit 4ef5a8c

Browse files
morningmanZhao Chun
authored andcommitted
Limit the memory consumption of broker scan node (#1996)
If memory exceed limit, no more row batch will be pushed to batch queue
1 parent ac16318 commit 4ef5a8c

File tree

12 files changed

+58
-32
lines changed

12 files changed

+58
-32
lines changed

be/src/exec/broker_scan_node.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ BrokerScanNode::BrokerScanNode(
3939
_tuple_desc(nullptr),
4040
_num_running_scanners(0),
4141
_scan_finished(false),
42-
_max_buffered_batches(1024),
42+
_max_buffered_batches(32),
4343
_wait_scanner_timer(nullptr) {
4444
}
4545

@@ -360,7 +360,11 @@ Status BrokerScanNode::scanner_scan(
360360
while (_process_status.ok() &&
361361
!_scan_finished.load() &&
362362
!_runtime_state->is_cancelled() &&
363-
_batch_queue.size() >= _max_buffered_batches) {
363+
// stop pushing more batch if
364+
// 1. too many batches in queue, or
365+
// 2. at least one batch in queue and memory exceed limit.
366+
(_batch_queue.size() >= _max_buffered_batches
367+
|| (mem_tracker()->limit_exceeded() && !_batch_queue.empty()))) {
364368
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
365369
}
366370
// Process already set failed, so we just return OK

be/src/exec/olap_scan_node.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
11991199
row_batch->set_scanner_id(scanner->id());
12001200
status = scanner->get_batch(_runtime_state, row_batch, &eos);
12011201
if (!status.ok()) {
1202-
LOG(WARNING) << "Scan thread read OlapScanner failed!";
1202+
LOG(WARNING) << "Scan thread read OlapScanner failed: " << status.to_string();
12031203
eos = true;
12041204
break;
12051205
}

be/src/olap/task/engine_storage_migration_task.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
110110
}
111111
tablet->release_header_lock();
112112

113-
// generate schema hash path where files will be migrated
113+
// get a random store of specified storage medium
114114
auto stores = StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
115115
if (stores.empty()) {
116116
res = OLAP_ERR_INVALID_ROOT_PATH;

fe/src/main/java/org/apache/doris/alter/AlterJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,8 @@ public boolean isPreviousLoadFinished() {
289289
if (isPreviousLoadFinished) {
290290
return true;
291291
} else {
292-
isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr()
293-
.isPreviousTransactionsFinished(transactionId, dbId);
292+
isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
293+
transactionId, dbId);
294294
return isPreviousLoadFinished;
295295
}
296296
}

fe/src/main/java/org/apache/doris/analysis/DateLiteral.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private void init(String s, Type type) throws AnalysisException {
203203
second = dateTime.getSecondOfMinute();
204204
this.type = type;
205205
} catch (Exception ex) {
206-
throw new AnalysisException("date literal [" + s + "] is valid");
206+
throw new AnalysisException("date literal [" + s + "] is invalid");
207207
}
208208
}
209209

@@ -542,7 +542,6 @@ private static DateTimeFormatterBuilder formatBuilder(String pattern) throws Ana
542542

543543
public DateLiteral plusDays(int day) throws AnalysisException {
544544
LocalDateTime dateTime;
545-
DateTimeFormatterBuilder builder = new DateTimeFormatterBuilder();
546545
if (type == Type.DATE) {
547546
dateTime = DATE_FORMATTER.parseLocalDateTime(getStringValue()).plusDays(day);
548547
} else {

fe/src/main/java/org/apache/doris/analysis/InsertStmt.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import com.google.common.base.Joiner;
5151
import com.google.common.base.Preconditions;
52+
import com.google.common.base.Strings;
5253
import com.google.common.collect.Lists;
5354
import com.google.common.collect.Maps;
5455
import com.google.common.collect.Sets;
@@ -93,6 +94,9 @@ public class InsertStmt extends DdlStmt {
9394
private Boolean isRepartition;
9495
private boolean isStreaming = false;
9596
private String label = null;
97+
private boolean isUserSpecifiedLabel = false;
98+
// uuid will be generated at analysis phase, and be used as loadid and query id of insert plan
99+
private UUID uuid;
96100

97101
private Map<Long, Integer> indexIdToSchemaHash = null;
98102

@@ -128,6 +132,10 @@ public InsertStmt(InsertTarget target, String label, List<String> cols, InsertSo
128132
this.queryStmt = source.getQueryStmt();
129133
this.planHints = hints;
130134
this.targetColumnNames = cols;
135+
136+
if (!Strings.isNullOrEmpty(label)) {
137+
isUserSpecifiedLabel = true;
138+
}
131139
}
132140

133141
// Ctor for CreateTableAsSelectStmt
@@ -216,8 +224,12 @@ public String getLabel() {
216224
return label;
217225
}
218226

219-
public boolean hasLabel() {
220-
return label != null;
227+
public boolean isUserSpecifiedLabel() {
228+
return isUserSpecifiedLabel;
229+
}
230+
231+
public UUID getUUID() {
232+
return uuid;
221233
}
222234

223235
// Only valid when this statement is streaming
@@ -260,19 +272,19 @@ public void analyze(Analyzer analyzer) throws UserException {
260272
// create data sink
261273
createDataSink();
262274

275+
uuid = UUID.randomUUID();
276+
if (Strings.isNullOrEmpty(label)) {
277+
label = "insert_" + uuid.toString();
278+
}
279+
263280
if (targetTable instanceof OlapTable) {
264281
String dbName = tblName.getDb();
265282
// check exist
266283
db = analyzer.getCatalog().getDb(dbName);
267-
// although the insert stmt maybe failed at next stage, but has to begin transaction here
268-
// if get transactionid at add job stage, the transaction id maybe a little larger, it maybe error at alter job to check
269-
// if all previous job finished
270-
UUID uuid = UUID.randomUUID();
271-
String jobLabel = "insert_" + uuid;
272284
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
273285
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
274286
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
275-
jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
287+
label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
276288
OlapTableSink sink = (OlapTableSink) dataSink;
277289
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
278290
sink.init(loadId, transactionId, db.getId());

fe/src/main/java/org/apache/doris/clone/TabletScheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,8 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx, Replica replica, St
882882
throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId);
883883
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
884884
long watermarkTxnId = replica.getWatermarkTxnId();
885-
if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId, tabletCtx.getDbId())) {
885+
if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
886+
tabletCtx.getDbId())) {
886887
throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished");
887888
}
888889
}

fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.doris.transaction.TransactionState;
5050

5151
import com.google.common.base.Joiner;
52+
import com.google.common.base.Strings;
5253
import com.google.common.collect.Lists;
5354
import com.google.common.collect.Sets;
5455

@@ -79,7 +80,7 @@ public class BrokerLoadJob extends LoadJob {
7980
// this param is used to persist the expr of columns
8081
// the origin stmt is persisted instead of columns expr
8182
// the expr of columns will be reanalyze when the log is replayed
82-
private String originStmt;
83+
private String originStmt = "";
8384

8485
// include broker desc and data desc
8586
private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo();
@@ -96,7 +97,7 @@ private BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, String ori
9697
super(dbId, label);
9798
this.timeoutSecond = Config.broker_load_default_timeout_second;
9899
this.brokerDesc = brokerDesc;
99-
this.originStmt = originStmt;
100+
this.originStmt = Strings.nullToEmpty(originStmt);
100101
this.jobType = EtlJobType.BROKER;
101102
this.authorizationInfo = gatherAuthInfo();
102103
}
@@ -258,7 +259,7 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
258259
*/
259260
@Override
260261
public void analyze() {
261-
if (originStmt == null) {
262+
if (Strings.isNullOrEmpty(originStmt)) {
262263
return;
263264
}
264265
// Reset dataSourceInfo, it will be re-created in analyze
@@ -511,6 +512,8 @@ public void readFields(DataInput in) throws IOException {
511512

512513
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
513514
originStmt = Text.readString(in);
515+
} else {
516+
originStmt = "";
514517
}
515518
// The origin stmt does not be analyzed in here.
516519
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.

fe/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -592,15 +592,11 @@ private void handleInsertStmt() throws Exception {
592592
}
593593

594594
long createTime = System.currentTimeMillis();
595-
UUID uuid = UUID.randomUUID();
596-
String label = insertStmt.getLabel();
597-
if (label == null) {
598-
// if label is not set, use the uuid as label
599-
label = uuid.toString();
600-
}
601-
602595
Throwable throwable = null;
603596

597+
UUID uuid = insertStmt.getUUID();
598+
String label = insertStmt.getLabel();
599+
604600
long loadedRows = 0;
605601
int filteredRows = 0;
606602
try {
@@ -676,7 +672,7 @@ private void handleInsertStmt() throws Exception {
676672
LOG.warn("errors when abort txn", abortTxnException);
677673
}
678674

679-
if (!Config.using_old_load_usage_pattern && !insertStmt.hasLabel()) {
675+
if (!Config.using_old_load_usage_pattern && !insertStmt.isUserSpecifiedLabel()) {
680676
// if not using old usage pattern, or user not specify label,
681677
// the exception will be thrown to user directly without a label
682678
StringBuilder sb = new StringBuilder(t.getMessage());
@@ -700,7 +696,7 @@ private void handleInsertStmt() throws Exception {
700696
// 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load.
701697
// 3. has filtered rows. so a label should be returned for user to show
702698
// 4. user specify a label for insert stmt
703-
if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.hasLabel()) {
699+
if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.isUserSpecifiedLabel()) {
704700
try {
705701
context.getCatalog().getLoadManager().recordFinishedLoadJob(
706702
label,

fe/src/main/java/org/apache/doris/rewrite/FEFunctions.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,28 @@ public static IntLiteral unixTimestamp(LiteralExpr arg) throws AnalysisException
120120
return new IntLiteral(unixTime, Type.INT);
121121
}
122122

123+
@FEFunction(name = "unix_timestamp", argTypes = { "DATE" }, returnType = "INT")
124+
public static IntLiteral unixTimestamp2(LiteralExpr arg) throws AnalysisException {
125+
long unixTime = ((DateLiteral) arg).unixTimestamp(TimeUtils.getTimeZone()) / 1000;
126+
// date before 1970-01-01 or after 2038-01-19 03:14:07 should return 0 for unix_timestamp() function
127+
unixTime = unixTime < 0 ? 0 : unixTime;
128+
unixTime = unixTime > Integer.MAX_VALUE ? 0 : unixTime;
129+
return new IntLiteral(unixTime, Type.INT);
130+
}
131+
123132
@FEFunction(name = "from_unixtime", argTypes = { "INT" }, returnType = "VARCHAR")
124133
public static StringLiteral fromUnixTime(LiteralExpr unixTime) throws AnalysisException {
125-
//if unixTime < 0, we should return null, throw a exception and let BE process
134+
// if unixTime < 0, we should return null, throw a exception and let BE process
126135
if (unixTime.getLongValue() < 0) {
127136
throw new AnalysisException("unixtime should larger than zero");
128137
}
129138
DateLiteral dl = new DateLiteral(unixTime.getLongValue() * 1000, TimeUtils.getTimeZone(), Type.DATETIME);
130139
return new StringLiteral(dl.getStringValue());
131140
}
141+
132142
@FEFunction(name = "from_unixtime", argTypes = { "INT", "VARCHAR" }, returnType = "VARCHAR")
133143
public static StringLiteral fromUnixTime(LiteralExpr unixTime, StringLiteral fmtLiteral) throws AnalysisException {
134-
//if unixTime < 0, we should return null, throw a exception and let BE process
144+
// if unixTime < 0, we should return null, throw a exception and let BE process
135145
if (unixTime.getLongValue() < 0) {
136146
throw new AnalysisException("unixtime should larger than zero");
137147
}

0 commit comments

Comments
 (0)