Skip to content

Remember previously assigned shardId #3106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public interface RawRecordContainer {

void setDataType(Type dataType);

/**
* Gets the previoulsy assigned shardId if present. This is only used for non-standard flows such as re-processing.
*
* @return Previously assigned shardId or null if one does not exist
*/
default String getShardId() {
return null;
}

/**
* Gets the primary date associated with the record, a.k.a the "event date"
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ public int getNumShards(String date) {
* @return the shard id
*/
public String getShardId(RawRecordContainer record) {
String shardId = getBaseShardId(record);
for (ShardIdGenerator generator : generators) {
if (generator.isApplicable(record)) {
int numShards = getNumShards(record.getDate());
shardId = generator.getShardId(record, shardId, numShards);
break;
String shardId = record.getShardId();

if (shardId == null) {
shardId = getBaseShardId(record);
for (ShardIdGenerator generator : generators) {
if (generator.isApplicable(record)) {
int numShards = getNumShards(record.getDate());
shardId = generator.getShardId(record, shardId, numShards);
break;
}
}
}

return shardId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void setupMocks() {
EasyMock.expect(event.getDataType()).andReturn(TypeRegistry.getType(TEST_TYPE)).anyTimes();
EasyMock.expect(event.getId()).andReturn(TEST_UID).anyTimes();
EasyMock.expect(event.getDate()).andReturn(timestamp).anyTimes();
EasyMock.expect(event.getShardId()).andReturn(null).anyTimes();
EasyMock.expect(event.getRawFileName()).andReturn("dummy_filename.txt").anyTimes();
EasyMock.replay(event);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package datawave.ingest.mapreduce.handler.shard;

import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import datawave.data.hash.HashUID;
import datawave.ingest.config.RawRecordContainerImpl;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.RawRecordContainerImplTest;
import datawave.ingest.data.Type;
Expand All @@ -17,14 +16,19 @@

class ShardIdFactoryTest {

private String uid = "1.2.3";
private String date = "20240115";
private String dataType = "csva";
private int numShards = 10;

private Configuration conf;

@BeforeEach
void setUp() {
conf = new Configuration();
conf.set("num.shards", "10");
conf.setInt("num.shards", numShards);

Type csva = new Type("csva", CSVIngestHelper.class, null, null, 0, null);
Type csva = new Type(dataType, CSVIngestHelper.class, null, null, 0, null);

TypeRegistry.reset();
TypeRegistry registry = TypeRegistry.getInstance(conf);
Expand All @@ -37,7 +41,7 @@ void setUp() {
*/
@Test
void testGetShardIdWithNoGenerators() {
RawRecordContainer event = createEvent("1.2.3", DateHelper.parse("20240115"), "csva");
RawRecordContainer event = createEvent(uid, date, dataType);
String shardId = new ShardIdFactory(conf).getShardId(event);
Assertions.assertEquals("20240115_2", shardId);
}
Expand All @@ -56,7 +60,7 @@ void testGetShardIdWithNoApplicableGenerators() {
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.typeName", "text");
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.partition", "10");

RawRecordContainer event = createEvent("1.2.3", DateHelper.parse("20240115"), "csva");
RawRecordContainer event = createEvent(uid, date, dataType);
String shardId = new ShardIdFactory(conf).getShardId(event);
Assertions.assertEquals("20240115_2", shardId);
}
Expand All @@ -72,10 +76,10 @@ void testGetShardIdWithSingleApplicableGenerators() {
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".1.partition", "20");

conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2", ConstantPartition.class.getName());
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.typeName", "csva");
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.typeName", dataType);
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.partition", "10");

RawRecordContainer event = createEvent("1.2.3", DateHelper.parse("20240115"), "csva");
RawRecordContainer event = createEvent(uid, date, dataType);
String shardId = new ShardIdFactory(conf).getShardId(event);
Assertions.assertEquals("20240115_10", shardId);
}
Expand All @@ -87,24 +91,54 @@ void testGetShardIdWithSingleApplicableGenerators() {
@Test
void testGetShardIdWithMultipleApplicableGenerators() {
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".1", ConstantPartition.class.getName());
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".1.typeName", "csva");
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".1.typeName", dataType);
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".1.partition", "20");

conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2", ConstantPartition.class.getName());
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.typeName", "csva");
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.typeName", dataType);
conf.set(ShardIdFactory.SHARD_ID_GENERATOR + ".2.partition", "10");

RawRecordContainer event = createEvent("1.2.3", DateHelper.parse("20240115"), "csva");
RawRecordContainer event = createEvent(uid, date, dataType);
String shardId = new ShardIdFactory(conf).getShardId(event);
Assertions.assertEquals("20240115_20", shardId);
}

private RawRecordContainer createEvent(String uid, Date date, String dataType) {
@Test
public void testGetShardIdWhenPreviouslyAssigned() {
// Create a shard higher than our numShards to prove we are not computing it
String expectedShardId = date + "_" + (numShards * 2);

RawRecordContainer event = new EventWithShardId(expectedShardId);
initEvent(event, uid, date, dataType);

String actualShardId = new ShardIdFactory(conf).getShardId(event);

Assertions.assertEquals(expectedShardId, actualShardId);
}

private RawRecordContainer createEvent(String uid, String date, String dataType) {
RawRecordContainerImplTest.ValidatingRawRecordContainerImpl event = new RawRecordContainerImplTest.ValidatingRawRecordContainerImpl();
initEvent(event, uid, date, dataType);
return event;
}

private void initEvent(RawRecordContainer event, String uid, String date, String dataType) {
event.setId(HashUID.parse(uid));
event.setTimestamp(date.getTime());
event.setTimestamp(DateHelper.parse(date).getTime());
event.setDataType(TypeRegistry.getType(dataType));
return event;
}

public static class EventWithShardId extends RawRecordContainerImpl {
private String shardId;

public EventWithShardId(String shardId) {
this.shardId = shardId;
}

@Override
public String getShardId() {
return shardId;
}
}

public static class ConstantPartition implements ShardIdGenerator {
Expand Down