-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Open
Description
Reference code : https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java
My code :
Consumer Strategy Have chosen lastest, This setting doesn't seem to work, Program restart consumes historical data,
How to solve this problem?
The complete code is as follows:
try {
Map<String, String> optionParams = new HashMap<>();
optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, nameSrvAddr);
SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]");
JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new Duration(duration));
List<String> topics = new ArrayList<>();
if (StringUtils.hasText(topic)) {
for (String s : topic.split(";")) {
topics.add(s);
}
}
LocationStrategy locationStrategy = LocationStrategy.PreferConsistent();
JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(sc, groupId,
topics, ConsumerStrategy.lastest(), false, false, false, locationStrategy, optionParams);
stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception {
JavaRDD<GPSRDD> GPSRDDJavaRDD = messageExtJavaRDD.map(new Function<MessageExt, GPSRDD>() {
private static final long serialVersionUID = 1L;
@Override
public GPSRDD call(MessageExt messageExt) throws Exception {
GPSRDD gps = new GPSRDD();
String xxx = new String(messageExt.getBody());
System.out.println(xxx);
return gps;
}
});
}
});
sc.start();
} catch (Exception e) {
e.printStackTrace();
}
Metadata
Metadata
Assignees
Labels
No labels