import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.node.JsonNodeFactory; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.Serde; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.connect.json.JsonDeserializer; | |
import org.apache.kafka.connect.json.JsonSerializer; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.KStreamBuilder; | |
import org.apache.kafka.streams.kstream.KTable; | |
import java.time.Instant; | |
import java.util.Properties; | |
public class BankBalanceExactlyOnceApp { | |
public static void main(String[] args) { | |
Properties config = new Properties(); | |
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application"); | |
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); | |
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod | |
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); | |
// Exactly once processing!! | |
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); | |
// json Serde | |
final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); | |
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); | |
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); | |
KStreamBuilder builder = new KStreamBuilder(); | |
KStream<String, JsonNode> bankTransactions = | |
builder.stream(Serdes.String(), jsonSerde, "bank-transactions"); | |
// create the initial json object for balances | |
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode(); | |
initialBalance.put("count", 0); | |
initialBalance.put("balance", 0); | |
initialBalance.put("time", Instant.ofEpochMilli(0L).toString()); | |
KTable<String, JsonNode> bankBalance = bankTransactions | |
.groupByKey(Serdes.String(), jsonSerde) | |
.aggregate( | |
() -> initialBalance, | |
(key, transaction, balance) -> newBalance(transaction, balance), | |
jsonSerde, | |
"bank-balance-agg" | |
); | |
bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once"); | |
KafkaStreams streams = new KafkaStreams(builder, config); | |
streams.cleanUp(); | |
streams.start(); | |
// print the topology | |
System.out.println(streams.toString()); | |
// shutdown hook to correctly close the streams application | |
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); | |
} | |
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) { | |
// create a new balance json object | |
ObjectNode newBalance = JsonNodeFactory.instance.objectNode(); | |
newBalance.put("count", balance.get("count").asInt() + 1); | |
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt()); | |
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli(); | |
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli(); | |
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch)); | |
newBalance.put("time", newBalanceInstant.toString()); | |
return newBalance; | |
} | |
} |
Wednesday, 11 September 2019
BankBalanceExactlyOnceApp.java
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment