Programmering

Hvordan bruke Redis for sanntids strømbehandling

Roshan Kumar er senior produktsjef i Redis Labs.

Sanntids streaming av datainntak er et vanlig krav i mange store datasaker. I felt som IoT, e-handel, sikkerhet, kommunikasjon, underholdning, økonomi og detaljhandel, hvor så mye avhenger av rettidig og nøyaktig datadrevet beslutningstaking, er sanntids datainnsamling og analyse faktisk kjernen i virksomheten.

Imidlertid gir innsamling, lagring og behandling av strømmedata i store volumer og med høy hastighet arkitektoniske utfordringer. Et viktig første skritt i å levere sanntids dataanalyse er å sikre at tilstrekkelige nettverks-, beregnings-, lagrings- og minnesressurser er tilgjengelige for å fange raske datastrømmer. Men et selskaps programvarestabel må matche ytelsen til den fysiske infrastrukturen. Ellers vil bedrifter møte et enormt etterslep av data, eller verre, manglende eller ufullstendige data.

Redis har blitt et populært valg for slike raske scenarier for datainntak. Redis er en lett databaseplattform i minnet, og oppnår gjennomstrømning i millioner av operasjoner per sekund med forsinkelser på under millisekunder, mens du trekker på minimale ressurser. Det tilbyr også enkle implementeringer, aktivert av flere datastrukturer og funksjoner.

I denne artikkelen vil jeg vise hvordan Redis Enterprise kan løse vanlige utfordringer knyttet til inntak og behandling av store mengder data med høy hastighet. Vi går gjennom tre forskjellige tilnærminger (inkludert kode) for å behandle en Twitter-feed i sanntid, ved å bruke henholdsvis Redis Pub / Sub, Redis Lists og Redis Sorted Sets. Som vi får se, har alle tre metodene en rolle å spille i rask datainntak, avhengig av brukssaken.

Utfordringer med å designe raske datainntaksløsninger

Inntak av høyhastighets data innebærer ofte flere forskjellige typer kompleksitet:

  • Store mengder data som noen ganger kommer i utbrudd. Bursty data krever en løsning som er i stand til å behandle store datamengder med minimal ventetid. Ideelt sett bør den kunne utføre millioner av skriv per sekund med forsinkelse på under millisekunder, ved å bruke minimale ressurser.
  • Data fra flere kilder. Datainntaksløsninger må være fleksible nok til å håndtere data i mange forskjellige formater, og beholde kildens identitet om nødvendig og transformere eller normalisere i sanntid.
  • Data som må filtreres, analyseres eller videresendes. De fleste datainntaksløsninger har en eller flere abonnenter som bruker dataene. Dette er ofte forskjellige applikasjoner som fungerer på samme eller forskjellige steder med et variert sett med forutsetninger. I slike tilfeller trenger databasen ikke bare å transformere dataene, men også filtrere eller samle avhengig av kravene til forbruksapplikasjonene.
  • Data som kommer fra geografisk distribuerte kilder. I dette scenariet er det ofte praktisk å distribuere nodene for datainnsamling og plassere dem nær kildene. Selve nodene blir en del av den raske datainntaksløsningen, for å samle inn, behandle, videresende eller omdirigere inntaksdata.

Håndtering av raske datainntak i Redis

Mange løsninger som støtter rask datainntak i dag er komplekse, funksjonsrike og overkonstruerte for enkle krav. Redis er derimot ekstremt lett, rask og enkel å bruke. Med klienter tilgjengelig på mer enn 60 språk kan Redis enkelt integreres med de populære programvarestakkene.

Redis tilbyr datastrukturer som lister, sett, sorterte sett og hasker som tilbyr enkel og allsidig databehandling. Redis leverer mer enn en million lese / skrive-operasjoner per sekund, med forsinkelse på under millisekunder på en beskjeden størrelse skyutvikling, noe som gjør den ekstremt ressurseffektiv for store datamengder. Redis støtter også meldingstjenester og klientbiblioteker på alle de populære programmeringsspråkene, noe som gjør det godt egnet for å kombinere høyhastighets datainntak og sanntidsanalyse. Redis Pub / Sub-kommandoer gjør det mulig å spille rollen som en meldingsmegler mellom utgivere og abonnenter, en funksjon som ofte brukes til å sende varsler eller meldinger mellom distribuerte datainntaksnoder.

Redis Enterprise forbedrer Redis med sømløs skalering, alltid tilgjengelig tilgjengelighet, automatisk distribusjon og muligheten til å bruke kostnadseffektivt flashminne som en RAM-utvider, slik at behandlingen av store datasett kan oppnås kostnadseffektivt.

I avsnittene nedenfor vil jeg skissere hvordan du bruker Redis Enterprise til å takle vanlige utfordringer med datainntak.

Redis med hastigheten på Twitter

For å illustrere enkelheten til Redis, vil vi utforske en prøve med hurtig datainntaksløsning som samler meldinger fra en Twitter-feed. Målet med denne løsningen er å behandle tweets i sanntid og skyve dem ned i røret når de behandles.

Twitter-data som inntas av løsningen, forbrukes deretter av flere prosessorer langs linjen. Som vist i figur 1, handler dette eksemplet om to prosessorer - den engelske Tweet-prosessoren og influensorprosessoren. Hver prosessor filtrerer tweets og sender dem ned i sine respektive kanaler til andre forbrukere. Denne kjeden kan gå så langt som løsningen krever. Imidlertid stopper vi i vårt eksempel på tredje nivå, hvor vi samler populære diskusjoner blant engelsktalende og topp påvirkere.

Redis Labs

Vær oppmerksom på at vi bruker eksemplet på behandling av Twitter-feeder på grunn av hastigheten på dataankomst og enkelhet. Legg også merke til at Twitter-data når vår raske datainntak via en enkelt kanal. I mange tilfeller, for eksempel IoT, kan det være flere datakilder som sender data til hovedmottakeren.

Det er tre mulige måter å implementere denne løsningen på ved hjelp av Redis: inntak med Redis Pub / Sub, inntak med datastrukturen List, eller inntak med datastrukturen Sortert sett. La oss undersøke hvert av disse alternativene.

Inntak med Redis Pub / Sub

Dette er den enkleste implementeringen av rask datainntak. Denne løsningen bruker Redis's Pub / Sub-funksjon, som lar applikasjoner publisere og abonnere på meldinger. Som vist i figur 2, behandler hvert trinn dataene og publiserer dem til en kanal. Det påfølgende trinnet abonnerer på kanalen og mottar meldingene for videre behandling eller filtrering.

Redis Labs

Fordeler

  • Enkel å implementere.
  • Fungerer bra når datakilder og prosessorer distribueres geografisk.

Ulemper

  • Løsningen krever at forlagene og abonnentene er oppe hele tiden. Abonnenter mister data når de stoppes, eller når forbindelsen går tapt.
  • Det krever flere forbindelser. Et program kan ikke publisere og abonnere på den samme forbindelsen, så hver mellomliggende databehandler krever to tilkoblinger - en for å abonnere og en for å publisere. Hvis du kjører Redis på en DBaaS-plattform, er det viktig å kontrollere om pakken eller tjenestenivået ditt har noen grenser for antall tilkoblinger.

Et notat om forbindelser

Hvis mer enn en klient abonnerer på en kanal, skyver Redis dataene til hver klient lineært, den ene etter den andre. Store data nyttelaster og mange forbindelser kan innføre ventetid mellom en utgiver og abonnenter. Selv om standard hardgrense for maksimalt antall tilkoblinger er 10.000, må du teste og måle hvor mange tilkoblinger som passer for nyttelasten din.

Redis opprettholder en klientutgangsbuffer for hver klient. Standardgrensene for klientutgangsbufferen for Pub / Sub er satt som:

klient-utgang-buffer-grense pubsub 32 MB 8 MB 60

Med denne innstillingen vil Redis tvinge klienter til å koble fra under to forhold: hvis utgangsbufferen vokser utover 32 MB, eller hvis utgangsbufferen inneholder 8 MB data konsekvent i 60 sekunder.

Dette er indikasjoner på at klienter bruker dataene saktere enn de blir publisert. Skulle en slik situasjon oppstå, må du først prøve å optimalisere forbrukerne slik at de ikke legger til ventetid mens de bruker dataene. Hvis du merker at kundene dine fortsatt kobles fra, kan du øke grensene for client-output-buffer-limit pubsub eiendom i redis.conf. Vær oppmerksom på at eventuelle endringer i innstillingene kan øke ventetiden mellom utgiveren og abonnenten. Eventuelle endringer må testes og verifiseres grundig.

Kodedesign for Redis Pub / Sub-løsningen

Redis Labs

Dette er den enkleste av de tre løsningene som er beskrevet i denne artikkelen. Her er de viktige Java-klassene implementert for denne løsningen. Last ned kildekoden med full implementering her: //github.com/redislabsdemo/IngestPubSub.

De Abonnent klasse er kjerneklassen i dette designet. Hver Abonnent objekt opprettholder en ny forbindelse med Redis.

klasse Abonnent utvider JedisPubSub implementerer Runnable {

privat strengnavn;

private RedisConnection conn = null;

private Jedis jedis = null;

private String subscriberChannel;

offentlig abonnent (String subscriberName, String channelName) kaster Unntak {

navn = abonnentnavn;

subscriberChannel = kanalnavn;

Tråd t = ny tråd (denne);

t.start ();

       }

@Overstyring

offentlig tomkjøring () {

prøve{

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

mens (sant) {

jedis.subscribe (denne, this.subscriberChannel);

                      }

} fangst (unntak e) {

e.printStackTrace ();

              }

       }

@Overstyring

offentlig ugyldighet onMessage (strengkanal, strengmelding) {

super.onMessage (kanal, melding);

       }

}

De Forlegger klasse opprettholder en egen forbindelse til Redis for publisering av meldinger til en kanal.

public class Publisher {

RedisConnection conn = null;

Jedis jedis = null;

privat String kanal;

offentlig utgiver (String channelName) kaster Unntak {

kanal = kanalnavn;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void publish (String msg) kaster Unntak {

jedis.publish (kanal, msg);

       }

}

De EnglishTweetFilter, InfluencerTweetFilter, HashTagCollector, og InfluencerCollector filtre utvides Abonnent, som gjør dem i stand til å lytte til de innkommende kanalene. Siden du trenger separate Redis-tilkoblinger for å abonnere og publisere, har hver filterklasse sin egen RedisConnection gjenstand. Filtre lytter til de nye meldingene i kanalene sine i en løkke. Her er eksempelkoden til EnglishTweetFilter klasse:

offentlig klasse EnglishTweetFilter utvider abonnenten

{

private RedisConnection conn = null;

private Jedis jedis = null;

private String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) kaster Unntak {

super (navn, subscriberChannel);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@Overstyring

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = ny JsonParser ();

JsonElement jsonElement = jsonParser.parse (melding);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// filtermeldinger: publiser bare engelske tweets

hvis (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). er lik (“en”)) {

jedis.publish (publisherChannel, melding);

              }

       }

}

De Forlegger klasse har en publiseringsmetode som publiserer meldinger til ønsket kanal.

public class Publisher {

.

.     

public void publish (String msg) kaster Unntak {

jedis.publish (kanal, msg);

       }

.

}

Hovedklassen leser data fra inntaksstrømmen og legger den ut til AllData kanal. Hovedmetoden i denne klassen starter alle filterobjektene.

offentlig klasse IngestPubSub

{

.

offentlig ugyldig start () kaster unntak {

       .

       .

publisher = new Publisher (“AllData”);

englishFilter = new EnglishTweetFilter (“English Filter”, ”AllData”,

“EnglishTweets”);

influencerFilter = new InfluencerTweetFilter (“Influencer Filter”,

“AllData”, “InfluencerTweets”);

hashtagCollector = ny HashTagCollector (“Hashtag Collector”,

“EnglishTweets”);

influencerCollector = new InfluencerCollector (“Influencer Collector”,

“InfluencerTweets”);

       .

       .

}

Inntak med Redis-lister

Listedatastrukturen i Redis gjør det enkelt og greit å implementere en køløsning. I denne løsningen skyver produsenten hver melding på baksiden av køen, og abonnenten avspørrer køen og henter nye meldinger fra den andre enden.

Redis Labs

Fordeler

  • Denne metoden er pålitelig i tilfeller av tilkoblingstap. Når data er dyttet inn i listene, blir de bevart der til abonnentene leser dem. Dette gjelder selv om abonnentene blir stoppet eller mister forbindelsen til Redis-serveren.
  • Produsenter og forbrukere krever ingen forbindelse mellom dem.

Ulemper

  • Når data er hentet fra listen, fjernes de og kan ikke hentes igjen. Med mindre forbrukerne vedvarer dataene, går de tapt så snart de forbrukes.
  • Hver forbruker krever en egen kø, som krever lagring av flere kopier av dataene.

Kodedesign for Redis Lists-løsningen

Redis Labs

Du kan laste ned kildekoden for Redis Lists-løsningen her: //github.com/redislabsdemo/IngestList. Hovedklassene til denne løsningen er forklart nedenfor.

Meldingsliste innebærer Redis List-datastrukturen. De trykk() metoden skyver den nye meldingen til venstre for køen, og pop () venter på en ny melding fra høyre hvis køen er tom.

offentlig klasse MessageList {

beskyttet strengnavn = “MyList”; // Navn

.

.     

public void push (String msg) kaster Unntak {

jedis.lpush (navn, msg); // Venstre trykk

       }

offentlig String pop () kaster Unntak {

returner jedis.brpop (0, navn) .toString ();

       }

.

.

}

MessageListener er en abstrakt klasse som implementerer lytter- og forlagslogikk. EN MessageListener objektet lytter til bare en liste, men kan publisere til flere kanaler (MessageFilter gjenstander). Denne løsningen krever en separat MessageFilter objekt for hver abonnent nedover røret.

class MessageListener implementerer Runnable {

privat strengnavn = null;

privat MessageList inboundList = null;

Map outBoundMsgFilters = ny HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

hvis (msgFilter! = null) {

hvis (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@Overstyring

offentlig ugyldig kjøring () {

.

mens (sant) {

Streng msg = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

beskyttet ugyldig pushMessage (streng msg) kaster unntak {

Sett outBoundMsgNames = outBoundMsgFilters.keySet ();

for (strengnavn: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (navn);

msgList.filterAndPush (msg);

              }

       }

}

MessageFilter er en foreldreklasse som tilrettelegger for filterAndPush () metode. Når data strømmer gjennom inntakssystemet, blir de ofte filtrert eller transformert før de sendes til neste trinn. Klasser som utvider MessageFilter klasse overstyre filterAndPush () metode, og implementere sin egen logikk for å skyve den filtrerte meldingen til neste liste.

offentlig klasse MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (String msg) kaster unntak {

messageList.push (msg);

       }

.

.     

}

AllTweetsListener er et eksempel på implementering av en MessageListener klasse. Dette lytter til alle tweets på AllData kanal, og publiserer dataene til EnglishTweetsFilter og InfluencerFilter.

offentlig klasse AllTweetsListener utvider MessageListener {

.

.     

offentlig statisk ugyldig hoved (String [] args) kaster Unntak {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (ny

EnglishTweetsFilter (“EnglishTweetsFilter”, “EnglishTweets”));

allTweetsProcessor.registerOutBoundMessageList (ny

InfluencerFilter (“InfluencerFilter”, “Influencers”);

allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilter strekker MessageFilter. Denne klassen implementerer logikk for å velge bare de tweets som er merket som engelske tweets. Filteret forkaster ikke-engelske tweets og skyver engelske tweets til neste liste.

offentlig klasse EnglishTweetsFilter utvider MessageFilter {

public EnglishTweetsFilter (String name, String listName) kaster unntak {

super (navn, listenavn);

       }

@Overstyring

public void filterAndPush (strengmelding) kaster unntak {

JsonParser jsonParser = ny JsonParser ();

JsonElement jsonElement = jsonParser.parse (melding);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

hvis (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). er lik (“en”)) {

Jedis jedis = super.getJedisInstance ();

hvis (jedis! = null) {

jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found