Programmering

Bygget for sanntid: Big data messaging med Apache Kafka, del 2

I første halvdel av denne JavaWorld-introduksjonen til Apache Kafka utviklet du et par småskala produsent / forbrukerapplikasjoner ved bruk av Kafka. Fra disse øvelsene bør du være kjent med det grunnleggende i Apache Kafka meldingssystem. I denne andre halvdelen lærer du hvordan du bruker partisjoner for å distribuere belastning og skalere applikasjonen horisontalt, og håndtere opptil millioner meldinger per dag. Du lærer også hvordan Kafka bruker meldingsforskyvninger for å spore og administrere kompleks meldingsbehandling, og hvordan du beskytter Apache Kafka-meldingssystemet mot feil hvis en forbruker skulle gå ned. Vi utvikler eksempelapplikasjonen fra del 1 for både utgivelsesabonnement og punkt-til-punkt-brukstilfeller.

Skillevegger i Apache Kafka

Emner i Kafka kan deles inn i partisjoner. Når du for eksempel oppretter et emne som heter Demo, kan du konfigurere det til å ha tre partisjoner. Serveren oppretter tre loggfiler, en for hver av demopartisjonene. Når en produsent publiserte en melding til emnet, tildelte den en partisjons-ID for den meldingen. Serveren vil da bare legge meldingen til loggfilen for den partisjonen.

Hvis du deretter startet to forbrukere, kan serveren tilordne partisjoner 1 og 2 til den første forbrukeren, og partisjon 3 til den andre forbrukeren. Hver forbruker ville bare lese fra de tildelte partisjonene. Du kan se demo-emnet konfigurert for tre partisjoner i figur 1.

For å utvide scenariet, forestill deg en Kafka-klynge med to meglere, plassert i to maskiner. Når du partisjonerte demo-emnet, ville du konfigurere det til å ha to partisjoner og to kopier. For denne typen konfigurasjon tildeler Kafka-serveren de to partisjonene til de to meglerne i klyngen din. Hver megler ville være leder for en av partisjonene.

Når en produsent publiserte en melding, ville den gå til partisjonslederen. Lederen ville ta meldingen og legge den til i loggfilen på den lokale maskinen. Den andre megleren ville passivt replikere den forpliktende loggen til sin egen maskin. Hvis partisjonslederen gikk ned, ville den andre megleren bli den nye lederen og begynne å betjene kundeforespørsler. På samme måte, når en forbruker sendte en forespørsel til en partisjon, ville den forespørselen først gå til partisjonslederen, som ville returnere de forespurte meldingene.

Fordeler med partisjonering

Vurder fordelene ved å partisjonere et Kafka-basert meldingssystem:

  1. Skalerbarhet: I et system med bare en partisjon lagres meldinger publisert til et emne i en loggfil som finnes på en enkelt maskin. Antall meldinger for et emne må passe inn i en enkelt loggfil, og størrelsen på lagrede meldinger kan aldri være mer enn maskinens diskplass. Ved å dele et emne kan du skalere systemet ditt ved å lagre meldinger på forskjellige maskiner i en klynge. Hvis du for eksempel ønsket å lagre 30 gigabyte (GB) meldinger for emnet Demo, kan du bygge en Kafka-klynge på tre maskiner, hver med 10 GB diskplass. Deretter vil du konfigurere emnet til å ha tre partisjoner.
  2. Balanse mellom server og last: Å ha flere partisjoner lar deg spre meldingsforespørsler på tvers av meglere. Hvis du for eksempel hadde et emne som behandlet 1 million meldinger per sekund, kan du dele det i 100 partisjoner og legge til 100 meglere i klyngen din. Hver megler ville være leder for enkelt partisjon, ansvarlig for å svare på bare 10 000 kundeforespørsler per sekund.
  3. Balanse mellom forbruker og last: I likhet med serverbelastningsjustering, kan du spre forbrukerbelastningen ved å være vert for flere forbrukere på forskjellige maskiner. La oss si at du ønsket å konsumere 1 million meldinger per sekund fra et emne med 100 partisjoner. Du kan opprette 100 forbrukere og kjøre dem parallelt. Kafka-serveren tilordner en partisjon til hver av forbrukerne, og hver forbruker vil behandle 10.000 meldinger parallelt. Siden Kafka tilordner hver partisjon til bare en forbruker, vil hver melding bli konsumert i rekkefølge innenfor partisjonen.

To måter å partisjonere

Produsenten er ansvarlig for å bestemme hvilken partisjon en melding skal gå til. Produsenten har to muligheter for å kontrollere denne oppgaven:

  • Egendefinert partisjonering: Du kan opprette en klasse som implementerer org.apache.kafka.clients.producer.Partitioner grensesnitt. Denne skikken Partisjonering vil implementere forretningslogikken for å bestemme hvor meldinger sendes.
  • DefaultPartitioner: Hvis du ikke oppretter en egendefinert partisjoneringsklasse, er standardinnstillingen org.apache.kafka.clients.producer.internals.DefaultPartitioner klasse vil bli brukt. Standard partisjoneringsenhet er god nok i de fleste tilfeller, og gir tre alternativer:
    1. Håndbok: Når du oppretter en Produsentopptak, bruk den overbelastede konstruktøren ny ProducerRecord (topicName, partitionId, messageKey, message) for å spesifisere en partisjons-ID.
    2. Hashing (lokalitetsfølsom): Når du oppretter en Produsentopptak, spesifiser en meldingKey, ved å ringe ny ProducerRecord (topicName, messageKey, message). DefaultPartitioner vil bruke hash av nøkkelen for å sikre at alle meldinger for samme nøkkel går til samme produsent. Dette er den enkleste og vanligste tilnærmingen.
    3. Sprøyting (Random Load Balancing): Hvis du ikke vil kontrollere hvilke partisjonsmeldinger som går til, kan du bare ringe ny ProducerRecord (topicName, melding) å lage din Produsentopptak. I dette tilfellet vil partisjonæren sende meldinger til alle partisjonene på rund-robin-måte, og sikre en balansert serverbelastning.

Partisjonere et Apache Kafka-program

For det enkle produsent / forbrukereksemplet i del 1 brukte vi a DefaultPartitioner. Nå prøver vi å lage en tilpasset partisjonering i stedet. For dette eksemplet, la oss anta at vi har et detaljhandelsted som forbrukere kan bruke til å bestille produkter hvor som helst i verden. Basert på bruk vet vi at de fleste forbrukere er i enten USA eller India. Vi ønsker å dele vår søknad om å sende ordrer fra USA eller India til sine respektive forbrukere, mens bestillinger fra andre steder vil gå til en tredje forbruker.

For å starte, oppretter vi en CountryPartitioner som implementerer org.apache.kafka.clients.producer.Partitioner grensesnitt. Vi må implementere følgende metoder:

  1. Kafka vil ringe konfigurer () når vi initialiserer Partisjonering klasse, med en Kart av konfigurasjonsegenskaper. Denne metoden initialiserer funksjoner som er spesifikke for applikasjonens forretningslogikk, for eksempel å koble til en database. I dette tilfellet vil vi ha en ganske generisk partisjonering som tar landsnavn som en eiendom. Vi kan da bruke configProperties.put ("partitions.0", "USA") for å kartlegge flyten av meldinger til partisjoner. I fremtiden kan vi bruke dette formatet til å endre hvilke land som får sin egen partisjon.
  2. De Produsent API-anrop skillevegg() en gang for hver melding. I dette tilfellet bruker vi den til å lese meldingen og analysere navnet på landet fra meldingen. Hvis navnet på landet er i countryToPartitionMap, kommer den tilbake partisjonId lagret i Kart. Hvis ikke, vil den hash verdien av landet og bruke den til å beregne hvilken partisjon den skal gå til.
  3. Vi ringer Lukk() å stenge partisjonen. Ved å bruke denne metoden sikrer du at eventuelle ressurser som er ervervet under initialiseringen blir ryddet opp under avslutning.

Merk at når Kafka ringer konfigurer (), Kafka-produsenten vil overføre alle eiendommene som vi har konfigurert for produsenten til Partisjonering klasse. Det er viktig at vi bare leser de egenskapene som begynner med skillevegger., analyser dem for å få partisjonId, og lagre ID-en i countryToPartitionMap.

Nedenfor er vår tilpassede implementering av Partisjonering grensesnitt.

Oppføring 1. CountryPartitioner

 offentlig klasse CountryPartitioner implementerer Partitioner {privat statisk Kart countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = ny HashMap (); for (Map.Entry entry: configs.entrySet ()) {if (entry.getKey (). startsWith ("partitions.")) {String keyName = entry.getKey (); Strengverdi = (String) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (verdi, paritionId); }}} offentlig int-partisjon (strengemne, objektnøkkel, byte [] keyBytes, objektverdi, byte [] valueBytes, klyngeklynge) {Liste partisjoner = cluster.availablePartitionsForTopic (emne); StrengverdiStr = (Streng) verdi; String countryName = ((String) verdi) .split (":") [0]; hvis (countryToPartitionMap.containsKey (countryName)) {// Hvis landet er kartlagt til en bestemt partisjon, returnerer det countryToPartitionMap.get (countryName); } annet {// Hvis ingen land er kartlagt til en bestemt partisjon, distribuer mellom gjenværende partisjoner int noOfPartitions = cluster.topics (). size (); returverdi.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} offentlig ugyldig lukk () {}} 

De Produsent klasse i oppføring 2 (nedenfor) er veldig lik vår enkle produsent fra del 1, med to endringer merket med fet skrift:

  1. Vi setter en konfigurasjonsegenskap med en nøkkel lik verdien av ProducerConfig.PARTITIONER_CLASS_CONFIG, som samsvarer med det fullstendige navnet på vårt CountryPartitioner klasse. Vi setter også landsnavn til partisjonId, og kartlegger dermed egenskapene vi vil overføre til CountryPartitioner.
  2. Vi passerer en forekomst av en klasse som implementerer org.apache.kafka.clients.producer.Callback grensesnitt som et annet argument for produsent. send () metode. Kafka-klienten vil ringe sin onCompletion () metode når en melding er publisert, vedlegger en RecordMetadata gjenstand. Vi kan bruke dette objektet til å finne ut hvilken partisjon en melding ble sendt til, samt forskyvningen som ble tildelt den publiserte meldingen.

Oppføring 2. En partisjonert produsent

 offentlig klasse produsent {privat statisk skanner i; public static void main (String [] argv) kaster Unntak {if (argv.length! = 1) {System.err.println ("Vennligst spesifiser 1 parametere"); System.exit (-1); } Streng topicName = argv [0]; i = ny skanner (System.in); System.out.println ("Skriv inn melding (skriv exit for å avslutte)"); // Konfigurer produsentegenskapene configProperties = nye egenskaper (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "India");  org.apache.kafka.clients.producer.Producer produsent = ny KafkaProducer (configProperties); Strenglinje = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); produsent. send (rek, new Callback () {public void onCompletion (RecordMetadata metadata, Unntak unntak) {System.out.println ("Melding sendt til emne ->" + metadata.topic () + ", parisjon->" + metadata.partition () + "lagret ved offset->" + metadata.offset ()); ; }}); linje = in.nextLine (); } in.close (); produsent.lukk (); }} 

Tilordne partisjoner til forbrukere

Kafka-serveren garanterer at en partisjon er tildelt bare en forbruker, og derved garanterer rekkefølgen på meldingsforbruket. Du kan tilordne en partisjon manuelt eller få den tildelt automatisk.

Hvis bedriftslogikken din krever mer kontroll, må du tilordne partisjoner manuelt. I dette tilfellet vil du bruke KafkaConsumer.assign () å overføre en liste over partisjoner som hver forbruker var interessert i, til Kakfa-serveren.

Å ha partisjoner tildelt automatisk er standard og vanligste valg. I dette tilfellet vil Kafka-serveren tildele en partisjon til hver forbruker, og vil tildele partisjoner på nytt for å skalere for nye forbrukere.

Si at du lager et nytt emne med tre partisjoner. Når du starter den første forbrukeren for det nye emnet, vil Kafka tildele alle tre partisjonene til samme forbruker. Hvis du starter en ny forbruker, vil Kafka tildele alle partisjonene på nytt, tildele en partisjon til den første forbrukeren og de resterende to partisjonene til den andre forbrukeren. Hvis du legger til en tredje forbruker, vil Kafka tildele partisjonene på nytt, slik at hver forbruker får tildelt en enkelt partisjon. Til slutt, hvis du starter fjerde og femte forbruker, vil tre av forbrukerne ha en tildelt partisjon, men de andre vil ikke motta noen meldinger. Hvis en av de tre første partisjonene går ned, vil Kafka bruke den samme partisjoneringslogikken for å overføre forbrukerens partisjon til en av de ekstra forbrukerne.

$config[zx-auto] not found$config[zx-overlay] not found