|
36 | 36 | import org.slf4j.Logger;
|
37 | 37 | import org.slf4j.LoggerFactory;
|
38 | 38 |
|
| 39 | +import java.io.ByteArrayInputStream; |
39 | 40 | import java.io.IOException;
|
40 | 41 | import java.io.InputStream;
|
41 | 42 | import java.io.OutputStream;
|
@@ -685,11 +686,37 @@ public static TypedProperties fetchConfigs(
|
685 | 686 |
|
686 | 687 | public static void recoverIfNeeded(HoodieStorage storage, StoragePath cfgPath,
|
687 | 688 | StoragePath backupCfgPath) throws IOException {
|
| 689 | + boolean needCopy = false; |
688 | 690 | if (!storage.exists(cfgPath)) {
|
689 |
| - // copy over from backup |
690 |
| - try (InputStream in = storage.open(backupCfgPath); |
691 |
| - OutputStream out = storage.create(cfgPath, false)) { |
692 |
| - FileIOUtils.copy(in, out); |
| 691 | + needCopy = true; |
| 692 | + } else { |
| 693 | + TypedProperties props = new TypedProperties(); |
| 694 | + try (InputStream in = storage.open(cfgPath)) { |
| 695 | + props.load(in); |
| 696 | + if (!props.containsKey(TABLE_CHECKSUM.key()) || !HoodieTableConfig.validateChecksum(props)) { |
| 697 | + // the cfg file is invalid |
| 698 | + storage.deleteFile(cfgPath); |
| 699 | + needCopy = true; |
| 700 | + } |
| 701 | + } |
| 702 | + } |
| 703 | + if (needCopy && storage.exists(backupCfgPath)) { |
| 704 | + byte[] bytes = FileIOUtils.readAsByteArray(storage.open(backupCfgPath)); |
| 705 | + // check whether existing backup file is valid or not |
| 706 | + try (InputStream backupStream = new ByteArrayInputStream(bytes)) { |
| 707 | + TypedProperties backupProps = new TypedProperties(); |
| 708 | + backupProps.load(backupStream); |
| 709 | + if (!backupProps.containsKey(TABLE_CHECKSUM.key()) || !HoodieTableConfig.validateChecksum(backupProps)) { |
| 710 | + // need to delete the backup as anyway reads will also fail |
| 711 | + // subsequent writes will recover and update |
| 712 | + storage.deleteFile(backupCfgPath); |
| 713 | + LOG.warn("Invalid properties file {}: {}", backupCfgPath, backupProps); |
| 714 | + throw new IOException("Corrupted backup file"); |
| 715 | + } |
| 716 | + // copy over from backup |
| 717 | + try (OutputStream out = storage.create(cfgPath, false)) { |
| 718 | + out.write(bytes); |
| 719 | + } |
693 | 720 | }
|
694 | 721 | }
|
695 | 722 | // regardless, we don't need the backup anymore.
|
|
0 commit comments