Programmering

Hvordan lage stateful streaming-applikasjoner med Apache Flink

Fabian Hueske er committer og PMC-medlem av Apache Flink-prosjektet og medstifter av Data Artisans.

Apache Flink er et rammeverk for å implementere stateful stream processing applikasjoner og kjøre dem i skala i en beregningsklynge. I en tidligere artikkel undersøkte vi hva statelig strømbehandling er, hvilke brukssaker den adresserer, og hvorfor du bør implementere og kjøre streamingapplikasjonene dine med Apache Flink.

I denne artikkelen vil jeg presentere eksempler på to vanlige brukstilfeller av stateful stream processing og diskutere hvordan de kan implementeres med Flink. Den første brukssaken er hendelsesdrevne applikasjoner, dvs. applikasjoner som inntar kontinuerlige strømmer av hendelser og bruker noen forretningslogikk på disse hendelsene. Det andre er streaminganalysesaken, hvor jeg vil presentere to analytiske spørsmål implementert med Flinks SQL API, som samler streamingdata i sanntid. Vi i Data Artisans gir kildekoden til alle eksemplene våre i et offentlig GitHub-arkiv.

Før vi dykker ned i detaljene i eksemplene, vil jeg introdusere hendelsesstrømmen som inntas av eksempelapplikasjonene og forklare hvordan du kan kjøre koden vi gir.

En strøm av drosjeturer

Eksempelapplikasjonene våre er basert på et offentlig datasett om drosjeturer som skjedde i New York City i 2013. Arrangørene av 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) Grand Challenge omorganiserte det originale datasettet og konverterte det til en enkelt CSV-fil som vi leser følgende ni felt fra.

  • Medaljong - en MD5-sum id av drosjen
  • Hack_license - en MD5-sum-id for drosjelisensen
  • Pickup_datetime - tiden da passasjerene ble hentet
  • Dropoff_datetime - tiden da passasjerene ble kastet av
  • Pickup_longitude - lengden på hentestedet
  • Pickup_latitude - breddegraden til hentestedet
  • Dropoff_longitude - lengdegraden til avreisestedet
  • Dropoff_latitude - breddegraden til avreisestedet
  • Totalbeløp - totalt betalt i dollar

CSV-filen lagrer postene i stigende rekkefølge av attributtet for frafallstid. Derfor kan filen behandles som en ordnet logg over hendelser som ble publisert da en tur ble avsluttet. For å kjøre eksemplene vi gir på GitHub, må du laste ned datasettet til DEBS-utfordringen fra Google Drive.

Alle eksempler på applikasjoner leser CSV-filen sekvensielt og inntar den som en strøm av taxiturhendelser. Derfra og videre behandler applikasjonene hendelsene akkurat som alle andre strømmer, dvs. som en strøm som blir inntatt fra et loggbasert publiseringsabonnementsystem, som Apache Kafka eller Kinesis. Å lese en fil (eller en hvilken som helst annen type vedvarende data) og behandle den som en strøm er faktisk en hjørnestein i Flinks tilnærming til å forene batch- og strømbehandling.

Kjører Flink-eksemplene

Som nevnt tidligere publiserte vi kildekoden til eksemplene våre i et GitHub-arkiv. Vi oppfordrer deg til å forkaste og klone depotet. Eksemplene kan enkelt utføres fra din IDE du velger; du trenger ikke å sette opp og konfigurere en Flink-klynge for å kjøre dem. Først importerer du kildekoden til eksemplene som et Maven-prosjekt. Utfør deretter hovedklassen til et program og oppgi lagringsplasseringen til datafilen (se lenken for nedlasting av dataene ovenfor) som en programparameter.

Når du har startet en applikasjon, vil den starte en lokal, innebygd Flink-forekomst inne i applikasjonens JVM-prosess og sende inn søknaden for å utføre den. Du vil se en rekke loggutsagn mens Flink starter og jobbens oppgaver planlegges. Når applikasjonen er i gang, vil utdataene skrives til standardutgangen.

Bygge en hendelsesdrevet applikasjon i Flink

La oss nå diskutere vår første brukssak, som er en hendelsesdrevet applikasjon. Hendelsesdrevne applikasjoner inntar strømmer av hendelser, utfører beregninger når hendelsene mottas, og kan avgi nye hendelser eller utløse eksterne handlinger. Flere hendelsesdrevne applikasjoner kan komponeres ved å koble dem sammen via hendelsesloggsystemer, i likhet med hvordan store systemer kan sammensettes fra mikrotjenester. Hendelsesdrevne applikasjoner, hendelseslogger og øyeblikksbilder av applikasjonstilstand (kjent som savepoints i Flink) består av et veldig kraftig designmønster fordi du kan tilbakestille tilstanden og spille av innspillene for å gjenopprette fra en feil, for å fikse en feil eller for å migrere applikasjon til en annen klynge.

I denne artikkelen vil vi undersøke et hendelsesdrevet program som støtter en tjeneste som overvåker arbeidstiden til drosjesjåfører. I 2016 bestemte NYC Taxi and Limousine Commission å begrense arbeidstiden til drosjesjåfører til 12 timers skift og kreve en pause på minst åtte timer før neste skift kan startes. Et skift starter med begynnelsen av den første turen. Fra da av kan en sjåfør starte nye turer innen et vindu på 12 timer. Søknaden vår sporer kjørerturer, markerer sluttiden til 12-timersvinduet (dvs. tidspunktet da de kan starte den siste turen), og flaggturer som bryter med reguleringen. Du finner den fullstendige kildekoden til dette eksemplet i GitHub-depotet vårt.

Vår applikasjon er implementert med Flinks DataStream API og en KeyedProcessFunction. DataStream API er en funksjonell API og basert på konseptet med typede datastrømmer. EN Data strøm er den logiske representasjonen av en strøm av hendelser av typen T. En strøm behandles ved å bruke en funksjon til den som produserer en annen datastrøm, muligens av en annen type. Flink behandler strømmer parallelt ved å distribuere hendelser for å streame partisjoner og bruke forskjellige forekomster av funksjoner til hver partisjon.

Følgende kodebit viser den høye strømmen av overvåkingsprogrammet vårt.

// innta strøm av drosjeturer.

DataStream rides = TaxiRides.getRides (env, inputPath);

Data strøm varsler = turer

// partisjonsstrøm etter førerkort-ID

.keyBy (r -> r.licenseId)

// overvåke ridehendelser og generere varsler

.prosess (ny MonitorWorkTime ());

// skriv ut varsler

notifications.print ();

Søknaden begynner å innta en strøm av drosjeturer. I vårt eksempel blir hendelsene lest fra en tekstfil, analysert og lagret i TaxiRide POJO-objekter. En applikasjon fra den virkelige verden vil vanligvis innta hendelsene fra en meldingskø eller hendelseslogg, for eksempel Apache Kafka eller Pravega. Det neste trinnet er å taste inn TaxiRide hendelser av lisensId av sjåføren. De keyBy operasjon partisjonerer strømmen på det deklarerte feltet, slik at alle hendelser med samme nøkkel behandles av samme parallelle forekomst av følgende funksjon. I vårt tilfelle deler vi på lisensId felt fordi vi ønsker å overvåke arbeidstiden til hver enkelt sjåfør.

Deretter bruker vi MonitorWorkTime funksjon på partisjonert TaxiRide arrangementer. Funksjonen sporer turene per sjåfør og overvåker skift og pausetid. Den avgir hendelser av typen Tuple2, hvor hver tuple representerer et varsel som består av førerkortets ID og en melding. Til slutt sender applikasjonen vår meldingene ved å skrive dem ut til standardutdata. En applikasjon fra den virkelige verden vil skrive varslene til en ekstern melding eller et lagringssystem, som Apache Kafka, HDFS eller et databasesystem, eller utløse en ekstern samtale for å umiddelbart skyve dem ut.

Nå som vi har diskutert den generelle strømmen av applikasjonen, la oss ta en titt på MonitorWorkTime funksjon, som inneholder det meste av applikasjonens faktiske forretningslogikk. De MonitorWorkTime funksjon er en stateful KeyedProcessFunction som inntar TaxiRide hendelser og utslipp Tuple2 poster. De KeyedProcessFunction grensesnittet har to metoder for å behandle data: processElement () og onTimer (). De processElement () metoden kalles for hver ankomsthendelse. De onTimer () metoden kalles når en tidligere registrert tidtaker utløses. Følgende utdrag viser skjelettet til MonitorWorkTime funksjon og alt som er erklært utenfor behandlingsmetodene.

offentlig statisk klasse MonitorWorkTime

utvider KeyedProcessFunction {

// tidskonstanter i millisekunder

privat statisk finale lang ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 timer

privat statisk endelig lang REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 timer

privat statisk endelig lang CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 timer

privat forbigående DateTimeFormatter formater;

// statushåndtak for å lagre starttidspunktet for et skift

ValueState shiftStart;

@Overstyring

offentlig tomrom åpent (Configuration conf) {

// registrer statlig håndtak

shiftStart = getRuntimeContext (). getState (

ny ValueStateDescriptor (“shiftStart”, Types.LONG));

// initialisere tidsformatering

this.formatter = DateTimeFormat.forPattern (“åååå-MM-dd HH: mm: ss”);

  }

// processElement () og onTimer () blir diskutert i detalj nedenfor.

}

Funksjonen erklærer noen få konstanter for tidsintervaller i millisekunder, en tidsformatering og et tilstandshåndtak for tastet tilstand som styres av Flink. Administrert tilstand kontrolleres regelmessig og gjenopprettes automatisk i tilfelle feil. Tastetilstand er organisert per nøkkel, noe som betyr at en funksjon vil opprettholde en verdi per håndtak og nøkkel. I vårt tilfelle er MonitorWorkTime funksjonen opprettholder en Lang verdi for hver nøkkel, dvs. for hver lisensId. De shiftStart staten lagrer starttidspunktet for førerens skift. Statens håndtak initialiseres i åpen() metode, som kalles en gang før den første hendelsen er behandlet.

La oss ta en titt på processElement () metode.

@Overstyring

offentlig ugyldig prosessElement (

TaxiRide-tur,

Kontekst ctx,

Samler ut) kaster Unntak {

// slå opp starttiden for siste skift

Lange startTs = shiftStart.value ();

hvis (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// dette er den første turen til et nytt skift.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

lange endTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

"Du har lov til å ta imot nye passasjerer til" + formatter.print (endTs)));

// registrer timer for å rydde opp i staten om 24 timer

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} annet hvis (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// denne turen startet etter at den tillatte arbeidstiden var over.

// det er et brudd på regelverket!

out.collect (Tuple2.of (ride.licenseId,

“Denne turen brøt med arbeidstidsbestemmelsene.”));

  }

}

De processElement () metoden kalles for hver TaxiRide begivenhet. Først henter metoden starttidspunktet for førerens skifte fra statushåndtaket. Hvis staten ikke inneholder en starttid (startTs == null) eller hvis siste skift startet mer enn 20 timer (ALLOWED_WORK_TIME + REQ_BREAK_TIME) tidligere enn den nåværende turen, er den nåværende turen den første turen i et nytt skift. I begge tilfeller starter funksjonen et nytt skift ved å oppdatere starttiden for skiftet til starttidspunktet for den aktuelle turen, sender en melding til sjåføren med sluttiden for det nye skiftet, og registrerer en tidtaker for å rydde opp i tilstand på 24 timer.

Hvis den nåværende turen ikke er den første turen på et nytt skift, kontrollerer funksjonen om den bryter med arbeidstidsreguleringen, dvs. om den startet mer enn 12 timer senere enn starten på førerens nåværende skift. Hvis det er tilfelle, sender funksjonen en melding for å informere sjåføren om overtredelsen.

De processElement () metoden for MonitorWorkTime funksjon registrerer en tidtaker for å rydde opp i tilstanden 24 timer etter starten av et skift. Det er viktig å fjerne tilstand som ikke lenger er nødvendig for å forhindre voksende tilstandsstørrelser på grunn av lekkasje. En tidtaker utløses når tidspunktet for applikasjonen passerer tidsuret for tidsuret. På det tidspunktet, onTimer () metoden kalles. I likhet med tilstand opprettholdes tidtakere per nøkkel, og funksjonen settes i sammenheng med den tilhørende nøkkelen før onTimer () metoden kalles. Derfor ledes all tilstandstilgang til nøkkelen som var aktiv da timeren ble registrert.

La oss ta en titt på onTimer () Metode av MonitorWorkTime.

@Overstyring

offentlig ugyldighet påTimer (

lange timer

OnTimerContext ctx,

Samler ut) kaster Unntak {

// fjern skifttilstanden hvis ingen nye skift allerede var startet.

Lange startTs = shiftStart.value ();

hvis (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

De processElement () metoden registrerer timere i 24 timer etter at et skift begynte å rydde opp i tilstanden som ikke lenger er nødvendig. Å rydde opp i staten er den eneste logikken som onTimer () metoden implementerer. Når en tidtaker skyter, sjekker vi om sjåføren startet et nytt skift i mellomtiden, dvs. om skiftets starttid endret seg. Hvis det ikke er tilfelle, tømmer vi skiftetilstanden for sjåføren.

$config[zx-auto] not found$config[zx-overlay] not found