Skip to content

Commit e17a2ea

Browse files
authored
Fixed issue with ivarator building visitor where the directory used was not specific enough (#3090)
to prevent improper reuse.
1 parent 31f6492 commit e17a2ea

File tree

4 files changed

+129
-6
lines changed

4 files changed

+129
-6
lines changed

warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,11 +1239,14 @@ protected IvaratorFuture fillSet(final Range boundingFiRange, final TotalResults
12391239
String taskName = getTaskName(boundingFiRange);
12401240
IvaratorFuture future = IteratorThreadPoolManager.getIvaratorFuture(taskName, this.initEnv);
12411241
if (future == null) {
1242+
log.debug("Creating ivarator runnable for " + taskName);
12421243
// no future exists, so get a source and create/execute a new IvaratorRunnable
12431244
// this will block until an ivarator source becomes available
12441245
SortedKeyValueIterator<Key,Value> source = takePoolSource();
12451246
IvaratorRunnable ivaratorRunnable = new IvaratorRunnable(this, source, boundingFiRange, boundingFiRange, this.fiRow, this.queryId, totalResults);
12461247
future = IteratorThreadPoolManager.executeIvarator(ivaratorRunnable, taskName, this.initEnv);
1248+
} else {
1249+
log.debug("Found ivarator runnable for " + taskName);
12471250
}
12481251
return future;
12491252
}
@@ -1256,7 +1259,9 @@ public String getTaskName(Range boundingFiRange) {
12561259
sb.append(" queryId:").append(queryId);
12571260
sb.append(" fiRow:").append(fiRow);
12581261
sb.append(" iHash:").append(getIHash(fiRow));
1262+
sb.append(" directory:").append(controlDir);
12591263
sb.append(" termNumber:").append(termNumber);
1264+
sb.append(" range:").append(boundingFiRange);
12601265
sb.append(" rangeHash:").append(Math.abs(boundingFiRange.hashCode() / 2));
12611266
return sb.toString();
12621267
}
@@ -1306,15 +1311,15 @@ protected boolean resumeFromIvaratorFutures() {
13061311
String taskName = ivaratorRunnable.getTaskName();
13071312
Status status = ivaratorRunnable.getStatus();
13081313
if (!status.equals(CREATED) && !status.equals(SUSPENDED) && !status.equals(COMPLETED)) {
1309-
log.error(String.format("Resuming Ivarator %s failed - taskName:%s has status:%s", ivaratorInfo, taskName, status));
1314+
log.info(String.format("Resuming Ivarator %s failed - taskName:%s has status:%s", ivaratorInfo, taskName, status));
13101315
canResume = false;
13111316
break;
13121317
}
13131318
// All IvaratorRunnables from the previous execution must reference the same Ivarator or something is wrong
13141319
if (previousIvarator == null) {
13151320
previousIvarator = ivaratorRunnable.getIvarator();
13161321
} else if (previousIvarator != ivaratorRunnable.getIvarator()) {
1317-
log.error(String.format("Resuming Ivarator %s failed - taskName:%s has inconsistent ivarator", ivaratorInfo, taskName));
1322+
log.info(String.format("Resuming Ivarator %s failed - taskName:%s has inconsistent ivarator", ivaratorInfo, taskName));
13181323
canResume = false;
13191324
break;
13201325
}
@@ -1346,7 +1351,7 @@ protected boolean resumeFromIvaratorFutures() {
13461351
} catch (IllegalStateException e) {
13471352
// unable to resume this IvaratorRunnable, it will get recreated later
13481353
// because its IvaratorFuture was removed a few lines above this
1349-
log.error(e.getMessage());
1354+
log.warn(e.getMessage());
13501355
}
13511356
} else if (status.equals(COMPLETED)) {
13521357
completed++;

warehouse/query-core/src/main/java/datawave/query/jexl/visitors/IteratorBuildingVisitor.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,18 +1036,49 @@ public Object visit(ASTNumberLiteral node, Object o) {
10361036
return null;
10371037
}
10381038

1039+
public String getDocument(Range range) {
1040+
// if the range has the same document in the start and end key, then return the document name.
1041+
// This is used to create unique ivarator directories for the DelayedNonEventSubTreeVisitor
1042+
if (range != null && range.getStartKey() != null && range.getEndKey() != null) {
1043+
String cf1 = range.getStartKey().getColumnFamily().toString();
1044+
String cf2 = range.getEndKey().getColumnFamily().toString();
1045+
StringBuilder builder = new StringBuilder();
1046+
int minLen = Math.min(cf1.length(), cf2.length());
1047+
for (int i = 0; i < minLen; i++) {
1048+
char c1 = cf1.charAt(i);
1049+
char c2 = cf2.charAt(i);
1050+
if (c1 == c2) {
1051+
if (c1 == NULL_DELIMETER.charAt(0)) {
1052+
c1 = '_';
1053+
}
1054+
builder.append(c1);
1055+
} else {
1056+
break;
1057+
}
1058+
}
1059+
if (builder.length() > 0) {
1060+
return builder.toString();
1061+
}
1062+
}
1063+
return null;
1064+
}
1065+
10391066
/**
10401067
* Build a list of potential hdfs directories based on each ivarator cache dir configs.
10411068
*
10421069
* @return A path
10431070
* @throws IOException
10441071
* for issues with read/write
10451072
*/
1046-
private List<IvaratorCacheDir> getIvaratorCacheDirs(int termNumber) throws IOException {
1073+
public List<IvaratorCacheDir> getIvaratorCacheDirs(int termNumber, Range rangeLimiter, String field, String value) throws IOException {
10471074
List<IvaratorCacheDir> pathAndFs = new ArrayList<>();
10481075

10491076
// use the ivaratorCount / term number to create a unique subdirectory
1050-
String subdirectory = ivaratorCacheSubDirPrefix + "term" + termNumber;
1077+
String subdirectory = ivaratorCacheSubDirPrefix + "_term_" + termNumber + "_field_" + field + "_valueHash_" + value.hashCode();
1078+
String document = getDocument(rangeLimiter);
1079+
if (document != null) {
1080+
subdirectory = subdirectory + "_doc_" + document;
1081+
}
10511082

10521083
if (ivaratorCacheDirConfigs != null && !ivaratorCacheDirConfigs.isEmpty()) {
10531084
for (IvaratorCacheDirConfig config : ivaratorCacheDirConfigs) {
@@ -1152,6 +1183,7 @@ public void ivarateList(JexlNode rootNode, JexlNode sourceNode, Object data) thr
11521183
fst = DatawaveFieldIndexListIteratorJexl.FSTManager.get(new Path(fstUri), hdfsFileCompressionCodec,
11531184
hdfsFileSystem.getFileSystem(fstUri));
11541185
}
1186+
listIterBuilder.setValue(fstUri.toString());
11551187
listIterBuilder.setFst(fst);
11561188

11571189
// cache this fst for use during JexlEvaluation.
@@ -1453,7 +1485,7 @@ public void ivarate(IvaratorBuilder builder, JexlNode rootNode, JexlNode sourceN
14531485
builder.setCompositeSeekThreshold(compositeSeekThreshold);
14541486
builder.setDatatypeFilter(getDatatypeFilter());
14551487
builder.setKeyTransform(getFiAggregator());
1456-
builder.setIvaratorCacheDirs(getIvaratorCacheDirs(this.ivaratorCount));
1488+
builder.setIvaratorCacheDirs(getIvaratorCacheDirs(this.ivaratorCount, rangeLimiter, builder.getField(), builder.getValue()));
14571489
builder.setTermNumber(this.ivaratorCount);
14581490
builder.setHdfsFileCompressionCodec(hdfsFileCompressionCodec);
14591491
builder.setQueryLock(queryLock);

warehouse/query-core/src/test/java/datawave/core/iterators/DatawaveFieldIndexIteratorJexlTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.accumulo.core.data.Range;
1414
import org.apache.hadoop.conf.Configuration;
1515
import org.apache.hadoop.fs.FileSystem;
16+
import org.apache.hadoop.fs.Path;
1617
import org.apache.hadoop.io.Text;
1718
import org.junit.After;
1819
import org.junit.Before;
@@ -140,4 +141,26 @@ public void buildBoundingFiRange_notUpperInclusive_multiCharStartSingleEnd_test(
140141
assertEquals(new Key(row, fiName, fieldValueNullAppended), r.getStartKey());
141142
assertEquals(new Key(row, fiName, new Text("y" + Constants.MAX_UNICODE_STRING)), r.getEndKey());
142143
}
144+
145+
@Test
146+
public void taskName_test() {
147+
DatawaveFieldIndexFilterIteratorJexl iteratorJexl = DatawaveFieldIndexFilterIteratorJexl.builder().upperInclusive(false).lowerInclusive(true)
148+
.withMaxRangeSplit(1).withFieldName("FIELD").withFieldValue("a").withUpperBound("z").withScanId("scan1").withQueryId("QID_1")
149+
.withTermNumber(12345).withIvaratorCacheDirs(cacheDirs).build();
150+
151+
String shard = "20250101_01";
152+
String dt = "dt";
153+
String uid = "2fe9872hg.1908h21f.10398hff1";
154+
Key start = new Key(shard, dt + '\u0000' + uid + '\u0000');
155+
Key end = new Key(shard, dt + '\u0000' + uid + new String(Character.toChars(Character.MAX_CODE_POINT)));
156+
Range range = new Range(start, false, end, false);
157+
String taskName = iteratorJexl.getTaskName(range);
158+
159+
assertTrue(taskName.contains(iteratorJexl.toStringNoQueryId()));
160+
assertTrue(taskName.contains("scan1"));
161+
assertTrue(taskName.contains("QID_1"));
162+
assertTrue(taskName.contains(new Path(cacheDirs.get(0).getPathURI().toString()).toString()));
163+
assertTrue(taskName.contains("12345"));
164+
assertTrue(taskName.contains(range.toString()));
165+
}
143166
}

warehouse/query-core/src/test/java/datawave/query/jexl/visitors/IteratorBuildingVisitorTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,21 @@
2222
import org.apache.commons.jexl3.parser.JexlNodes;
2323
import org.apache.commons.jexl3.parser.ParseException;
2424
import org.junit.Assert;
25+
import org.junit.ClassRule;
2526
import org.junit.Ignore;
2627
import org.junit.Test;
28+
import org.junit.rules.TemporaryFolder;
2729

30+
import datawave.core.iterators.filesystem.FileSystemCache;
2831
import datawave.query.Constants;
2932
import datawave.query.attributes.Attribute;
3033
import datawave.query.attributes.Document;
3134
import datawave.query.exceptions.DatawaveFatalQueryException;
3235
import datawave.query.iterator.NestedIterator;
3336
import datawave.query.iterator.SeekableNestedIterator;
3437
import datawave.query.iterator.SortedListKeyValueIterator;
38+
import datawave.query.iterator.ivarator.IvaratorCacheDir;
39+
import datawave.query.iterator.ivarator.IvaratorCacheDirConfig;
3540
import datawave.query.jexl.JexlASTHelper;
3641
import datawave.query.jexl.JexlNodeFactory;
3742
import datawave.query.jexl.LiteralRange;
@@ -40,6 +45,9 @@
4045

4146
public class IteratorBuildingVisitorTest {
4247

48+
@ClassRule
49+
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
50+
4351
private IteratorBuildingVisitor getDefault() {
4452
IteratorBuildingVisitor visitor = new IteratorBuildingVisitor();
4553
visitor.setSource(new SourceFactory(Collections.emptyIterator()), new TestIteratorEnvironment());
@@ -48,6 +56,61 @@ private IteratorBuildingVisitor getDefault() {
4856
return visitor;
4957
}
5058

59+
@Test
60+
public void rangeToDocumentTest() {
61+
IteratorBuildingVisitor visitor = getDefault();
62+
String shard = "20250101_01";
63+
String dt = "dt";
64+
String uid = "2fe9872hg.1908h21f.10398hff1";
65+
Key start = new Key(shard, dt + '\u0000' + uid + '\u0000');
66+
Key end = new Key(shard, dt + '\u0000' + uid + new String(Character.toChars(Character.MAX_CODE_POINT)));
67+
Range range = new Range(start, false, end, false);
68+
String doc = visitor.getDocument(range);
69+
Assert.assertEquals(dt + '_' + uid, doc);
70+
71+
end = new Key(shard, new String(Character.toChars(Character.MAX_CODE_POINT)) + "YIELD_BEGIN");
72+
range = new Range(start, false, end, false);
73+
doc = visitor.getDocument(range);
74+
Assert.assertEquals(null, doc);
75+
76+
start = new Key(shard);
77+
end = new Key(shard + '\u0000');
78+
range = new Range(start, false, end, false);
79+
doc = visitor.getDocument(range);
80+
Assert.assertEquals(null, doc);
81+
82+
doc = visitor.getDocument(null);
83+
Assert.assertEquals(null, doc);
84+
}
85+
86+
@Test
87+
public void ivaratorCacheDirTest() throws IOException {
88+
IteratorBuildingVisitor visitor = getDefault();
89+
String folder = temporaryFolder.newFolder().toURI().toString();
90+
List<IvaratorCacheDirConfig> configs = Collections.singletonList(new IvaratorCacheDirConfig(folder));
91+
visitor.setIvaratorCacheDirConfigs(configs);
92+
visitor.setQueryId("QID_1");
93+
visitor.setHdfsFileSystem(new FileSystemCache(null));
94+
String expected = folder + "QID_1/_term_1_field_field_valueHash_" + "value".hashCode();
95+
96+
List<IvaratorCacheDir> dirs = visitor.getIvaratorCacheDirs(1, null, "field", "value");
97+
98+
IvaratorCacheDir config = dirs.get(0);
99+
Assert.assertEquals(expected, config.getPathURI().toString());
100+
101+
String shard = "20250101_01";
102+
String dt = "dt";
103+
String uid = "2fe9872hg.1908h21f.10398hff1";
104+
Key start = new Key(shard, dt + '\u0000' + uid + '\u0000');
105+
Key end = new Key(shard, dt + '\u0000' + uid + new String(Character.toChars(Character.MAX_CODE_POINT)));
106+
Range range = new Range(start, false, end, false);
107+
dirs = visitor.getIvaratorCacheDirs(1, range, "field", "value");
108+
109+
config = dirs.get(0);
110+
// make sure we now have the document in the path
111+
Assert.assertEquals(expected + "_doc_" + dt + '_' + uid, config.getPathURI().toString());
112+
}
113+
51114
/**
52115
* null value should result in no iterator being built
53116
*/

0 commit comments

Comments
 (0)