Programmering

Bygget for sanntid: Big data messaging med Apache Kafka, del 1

Da big data-bevegelsen startet, var det mest fokusert på batchbehandling. Distribuerte datalagrings- og spørringsverktøy som MapReduce, Hive og Pig var alle designet for å behandle data i batcher i stedet for kontinuerlig. Bedrifter kjører flere jobber hver natt for å hente ut data fra en database, deretter analysere, transformere og til slutt lagre dataene. Nyere har bedrifter oppdaget kraften i å analysere og behandle data og hendelser når de skjer, ikke bare noen få timer. De fleste tradisjonelle meldingssystemer skaleres imidlertid ikke for å håndtere store data i sanntid. Så ingeniører hos LinkedIn bygde og åpne kilder Apache Kafka: et distribuert meldingsrammeverk som oppfyller kravene til stordata ved å skalere på råvare.

I løpet av de siste årene har Apache Kafka dukket opp for å løse en rekke brukssaker. I det enkleste tilfellet kan det være en enkel buffer for lagring av applikasjonslogger. Kombinert med en teknologi som Spark Streaming, kan den brukes til å spore dataendringer og iverksette tiltak på disse dataene før den lagres til en endelig destinasjon. Kafka's prediktive modus gjør det til et kraftig verktøy for å oppdage svindel, for eksempel å sjekke gyldigheten til en kredittkorttransaksjon når det skjer, og ikke vente på batchbehandling flere timer senere.

Denne todelte opplæringen introduserer Kafka, og begynner med hvordan du installerer og kjører den i utviklingsmiljøet ditt. Du får en oversikt over Kafkas arkitektur, etterfulgt av en introduksjon til å utvikle et out-of-the-box Apache Kafka meldingssystem. Til slutt bygger du en tilpasset produsent / forbrukerapplikasjon som sender og forbruker meldinger via en Kafka-server. I andre halvdel av opplæringen lærer du hvordan du partisjonerer og grupperer meldinger, og hvordan du styrer hvilke meldinger en Kafka-forbruker vil konsumere.

Hva er Apache Kafka?

Apache Kafka er meldingssystem bygget for å skalere for store data. I likhet med Apache ActiveMQ eller RabbitMq, lar Kafka applikasjoner bygget på forskjellige plattformer kommunisere via asynkron overføring av meldinger. Men Kafka skiller seg fra disse mer tradisjonelle meldingssystemene på sentrale måter:

  • Den er designet for å skalere horisontalt, ved å legge til flere vareservere.
  • Det gir mye høyere gjennomstrømning for både produsent- og forbrukerprosesser.
  • Den kan brukes til å støtte både batch- og sanntidsbruk.
  • Den støtter ikke JMS, Java sin meldingsorienterte mellomvare-API.

Apache Kafkas arkitektur

Før vi utforsker Kafkas arkitektur, bør du vite grunnleggende terminologi:

  • EN produsent er en prosess som kan publisere en melding til et emne.
  • en forbruker er en prosess som kan abonnere på ett eller flere emner og konsumere meldinger publisert til emner.
  • EN emnekategori er navnet på feeden som meldingene blir publisert til.
  • EN megler er en prosess som kjører på én maskin.
  • EN klynge er en gruppe meglere som jobber sammen.

Apache Kafkas arkitektur er veldig enkel, noe som kan gi bedre ytelse og gjennomstrømning i noen systemer. Hvert emne i Kafka er som en enkel loggfil. Når en produsent publiserer en melding, legger Kafka-serveren den til på slutten av loggfilen for det gitte emnet. Serveren tildeler også en forskyvning, som er et nummer som brukes til å identifisere hver melding permanent. Når antallet meldinger vokser, øker verdien av hver forskyvning; for eksempel hvis produsenten publiserer tre meldinger, kan den første få en forskyvning på 1, den andre en forskyvning på 2 og den tredje en forskyvning på 3.

Når Kafka-forbrukeren først starter, vil den sende en pull-forespørsel til serveren, og be om å hente eventuelle meldinger for et bestemt emne med en offsetverdi høyere enn 0. Serveren vil sjekke loggfilen for det emnet og returnere de tre nye meldingene . Forbrukeren vil behandle meldingene, og deretter sende en forespørsel om meldinger med en forskyvning høyere enn 3, og så videre.

I Kafka er klienten ansvarlig for å huske offsetellingen og hente meldinger. Kafka-serveren sporer ikke eller styrer meldingsforbruket. Som standard vil en Kafka-server beholde en melding i syv dager. En bakgrunnstråd i serveren sjekker og sletter meldinger som er syv dager eller eldre. En forbruker kan få tilgang til meldinger så lenge de er på serveren. Den kan lese en melding flere ganger, og til og med lese meldinger i motsatt rekkefølge. Men hvis forbrukeren ikke klarer å hente meldingen før de syv dagene er ute, vil den savne den meldingen.

Kafka-referanser

Produksjonsbruk av LinkedIn og andre virksomheter har vist at Apache Kafka med riktig konfigurasjon er i stand til å behandle hundrevis av gigabyte data daglig. I 2011 brukte tre LinkedIn-ingeniører referansetesting for å demonstrere at Kafka kunne oppnå mye høyere gjennomstrømning enn ActiveMQ og RabbitMQ.

Apache Kafka raskt oppsett og demo

Vi bygger en tilpasset applikasjon i denne opplæringen, men la oss starte med å installere og teste en Kafka-forekomst hos en produsent og forbruker utenom boksen.

  1. Besøk Kafka-nedlastingssiden for å installere den nyeste versjonen (0.9 når dette skrives).
  2. Pakk ut binærfiler i en programvare / kafka mappe. For den nåværende versjonen er det programvare / kafka_2.11-0.9.0.0.
  3. Endre den nåværende katalogen slik at den peker til den nye mappen.
  4. Start Zookeeper-serveren ved å utføre kommandoen: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Start Kafka-serveren ved å utføre: bin / kafka-server-start.sh config / server.properties.
  6. Lag et testemne som du kan bruke til testing: bin / kafka-topics.sh --create - zookeeper localhost: 2181 - replikasjonsfaktor 1 - partisjoner 1 --topic javaworld.
  7. Start en enkel konsollforbruker som kan konsumere meldinger publisert til et gitt emne, for eksempel javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 --topic javaworld - fra begynnelse.
  8. Start en enkel produsentkonsoll som kan publisere meldinger til testemnet: bin / kafka-console-producer.sh - meglerliste localhost: 9092 --topic javaworld.
  9. Prøv å skrive en eller to meldinger i produsentkonsollen. Meldingene dine skal vises i forbrukerkonsollen.

Eksempel på applikasjon med Apache Kafka

Du har sett hvordan Apache Kafka fungerer ut av esken. La oss deretter utvikle en tilpasset produsent / forbrukerapplikasjon. Produsenten vil hente brukerinngang fra konsollen og sende hver nye linje som en melding til en Kafka-server. Forbrukeren vil hente meldinger for et gitt emne og skrive dem ut til konsollen. Produsenten og forbrukerkomponentene i dette tilfellet er dine egne implementeringer av kafka-console-producer.sh og kafka-console-consumer.sh.

La oss starte med å lage en Produsent.java klasse. Denne klientklassen inneholder logikk for å lese brukerinngang fra konsollen og sende den som en melding til Kafka-serveren.

Vi konfigurerer produsenten ved å lage et objekt fra java.util.Eiendommer klasse og sette dens egenskaper. Klassen ProducerConfig definerer alle de forskjellige egenskapene som er tilgjengelige, men Kafkas standardverdier er tilstrekkelig for de fleste bruksområder. For standardkonfigurasjonen trenger vi bare å angi tre obligatoriske egenskaper:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) setter en liste over verts: portpar som brukes til å etablere de første tilkoblingene til Kakfa-klyngen i host1: port1, host2: port2, ... format. Selv om vi har mer enn en megler i Kafka-klyngen, trenger vi bare å spesifisere verdien av den første megleren vert: port. Kafka-klienten vil bruke denne verdien til å ringe megleren, som vil returnere en liste over alle meglerne i klyngen. Det er en god ide å spesifisere mer enn en megler i BOOTSTRAP_SERVERS_CONFIG, slik at hvis den første megleren er nede, vil klienten kunne prøve andre meglere.

Kafka-serveren forventer meldinger i byte [] -tast, byte [] -verdi format. I stedet for å konvertere hver nøkkel og verdi, tillater Kafka klientbibliotek oss å bruke vennligere typer som String og int for sending av meldinger. Biblioteket vil konvertere disse til riktig type. Eksempelappen har for eksempel ikke en meldingsspesifikk nøkkel, så vi bruker den null for nøkkelen. For verdien bruker vi a String, som er dataene som er angitt av brukeren på konsollen.

For å konfigurere meldingsnøkkel, setter vi en verdi på KEY_SERIALIZER_CLASS_CONFIGorg.apache.kafka.common.serialization.ByteArraySerializer. Dette fungerer fordi null trenger ikke konverteres til byte []. For meldingsverdi, vi setter VALUE_SERIALIZER_CLASS_CONFIGorg.apache.kafka.common.serialization.StringSerializer, fordi den klassen vet hvordan man konverterer en String inn i en byte [].

Egendefinerte nøkkel- / verdiobjekter

Lik StringSerializer, Kafka tilbyr serialisatorer for andre primitiver som int og lang. For å kunne bruke et egendefinert objekt for nøkkelen eller verdien vår, må vi lage en klasseimplementering org.apache.kafka.common.serialization.Serializer. Vi kan da legge til logikk for å serieisere klassen til byte []. Vi må også bruke en tilsvarende deserializer i vår forbrukerkode.

Kafka-produsenten

Etter å ha fylt Eiendommer klasse med de nødvendige konfigurasjonsegenskapene, kan vi bruke den til å lage et objekt av KafkaProdusent. Hver gang vi vil sende en melding til Kafka-serveren etter det, oppretter vi et objekt av Produsentopptak og ring KafkaProdusents sende() metoden med den posten for å sende meldingen. De Produsentopptak tar to parametere: navnet på emnet som meldingen skal publiseres til, og selve meldingen. Ikke glem å ringe Producer.close () metode når du er ferdig med produsenten:

Oppføring 1. KafkaProducer

 offentlig klasse produsent {privat statisk skanner i; public static void main (String [] argv) kaster Unntak {if (argv.length! = 1) {System.err.println ("Vennligst spesifiser 1 parametere"); System.exit (-1); } Streng topicName = argv [0]; i = ny skanner (System.in); System.out.println ("Skriv inn melding (skriv exit for å avslutte)"); // Konfigurer produsentegenskapene configProperties = nye egenskaper (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer produsent = ny KafkaProducer (configProperties); Strenglinje = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, line); produsent. sende (rek); linje = in.nextLine (); } in.close (); produsent.lukk (); }} 

Konfigurere meldingen forbruker

Deretter lager vi en enkel forbruker som abonnerer på et emne. Hver gang en ny melding blir publisert til emnet, vil den lese meldingen og skrive den ut på konsollen. Forbrukerkoden er ganske lik produsentkoden. Vi starter med å lage et objekt av java.util.Eiendommer, sette sine forbruker-spesifikke egenskaper, og deretter bruke den til å lage et nytt objekt av KafkaConsumer. Klassen ConsumerConfig definerer alle egenskapene vi kan angi. Det er bare fire obligatoriske egenskaper:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (verdi. Deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Akkurat som vi gjorde for produsentklassen, vil vi bruke BOOTSTRAP_SERVERS_CONFIG for å konfigurere verts- / portparene for forbrukerklassen. Denne konfigurasjonen lar oss etablere de første tilkoblingene til Kakfa-klyngen i host1: port1, host2: port2, ... format.

Som jeg tidligere har nevnt, forventer Kafka-serveren meldinger i byte [] nøkkel og byte [] verdiformater, og har sin egen implementering for å serieisere forskjellige typer til byte []. Akkurat som vi gjorde med produsenten, må vi bruke en tilpasset deserializer for å konvertere på forbrukersiden byte [] tilbake til riktig type.

Når det gjelder eksempelapplikasjonen, vet vi at produsenten bruker ByteArraySerializer for nøkkelen og StringSerializer for verdien. På klientsiden må vi derfor bruke org.apache.kafka.common.serialization.ByteArrayDeserializer for nøkkelen og org.apache.kafka.common.serialization.StringDeserializer for verdien. Angi disse klassene som verdier for KEY_DESERIALIZER_CLASS_CONFIG og VALUE_DESERIALIZER_CLASS_CONFIG vil gjøre det mulig for forbrukeren å deserialisere byte [] kodede typer sendt av produsenten.

Til slutt må vi sette verdien på GROUP_ID_CONFIG. Dette skal være et gruppenavn i strengformat. Jeg forklarer mer om denne konfigurasjonen om et øyeblikk. Foreløpig er det bare å se på Kafka-forbrukeren med de fire obligatoriske egenskapene:

$config[zx-auto] not found$config[zx-overlay] not found