Programmering

Java - gjenkjenning og håndtering av hengende tråder

Av Alex. C. Punnen

Arkitekt - Nokia Siemens Networks

Bangalore

Hengende tråder er en vanlig utfordring i utviklingen av programvare som må grensesnitt med proprietære enheter ved bruk av proprietære eller standardiserte grensesnitt som SNMP, Q3 eller Telnet. Dette problemet er ikke begrenset til nettverksadministrasjon, men forekommer over et bredt spekter av felt som webservere, prosesser som påkaller eksterne prosedyreanrop og så videre.

En tråd som initierer en forespørsel til en enhet trenger en mekanisme for å oppdage i tilfelle enheten ikke svarer eller bare svarer delvis. I noen tilfeller der en slik henging oppdages, må en spesifikk handling utføres. Den spesifikke handlingen kan enten være prøveversjon på nytt eller fortelle sluttbrukeren om oppgavefeilen eller et annet gjenopprettingsalternativ. I noen tilfeller der et stort antall oppgaver må avfyres til et stort antall nettverkselementer av en komponent, er hengende gjenkjenning viktig slik at den ikke blir en flaskehals for annen oppgavebehandling. Så det er to aspekter ved håndtering av hengende tråder: opptreden og melding.

For varslingsaspekt vi kan skreddersy Java Observer-mønsteret slik at det passer inn i den flertrådede verdenen.

Skreddersy Java Observer-mønster til flertrådede systemer

På grunn av hengende oppgaver, bruker du Java ThreadPool klasse med en passende strategi er den første løsningen som kommer til hjernen. Imidlertid bruker du Java ThreadPool i sammenheng med noen tråder som tilfeldig henger over en periode gir uønsket oppførsel basert på den spesifikke strategien som brukes, som trådsult i tilfelle av fast trådbassengstrategi. Dette skyldes hovedsakelig det faktum at Java ThreadPool har ikke en mekanisme for å oppdage en trådhengning.

Vi kan prøve et hurtigbufret trådbasseng, men det har også problemer. Hvis det er høy grad av oppgavefyring, og noen tråder henger, kan antall tråder skyte opp, og til slutt forårsake ressurssult og unntak utenfor minnet. Eller vi kan bruke en tilpasset ThreadPool strategi som påkaller en CallerRunsPolicy. I dette tilfellet kan en trådhengning også føre til at alle trådene til slutt henger. (Hovedtråden skal aldri være innringeren, siden det er en mulighet for at en hvilken som helst oppgave som sendes til hovedtråden kan henge og føre til at alt sliper.)

Så, hva er løsningen? Jeg skal demonstrere et ikke så enkelt ThreadPool-mønster som justerer bassengstørrelsen i henhold til oppgavehastigheten og basert på antall hengende tråder. La oss først gå til problemet med å oppdage hengende tråder.

Oppdage hengende tråder

Figur 1 viser en abstraksjon av mønsteret:

Det er to viktige klasser her: ThreadManager og ManagedThread. Begge strekker seg fra Java Tråd klasse. De ThreadManager holder en container som holder ManagedThreads. Når en ny ManagedThread er opprettet, legger den seg til denne beholderen.

 ThreadHangTester testthread = new ThreadHangTester ("threadhangertest", 2000, false); testthread.start (); thrdManger.manage (testtråd, ThreadManager.RESTART_THREAD, 10); thrdManger.start (); 

De ThreadManager det går gjennom denne listen og kaller ManagedThreads isHung () metode. Dette er i utgangspunktet en tidsstempel-sjekklogikk.

 hvis (System.currentTimeMillis () - lastprocessingtime.get ()> maxprocessingtime) {logger.debug ("Tråden er hengt"); returner sant; } 

Hvis den finner ut at en tråd har gått inn i en oppgavesløyfe og aldri oppdatert resultatene, tar den en gjenopprettingsmekanisme som fastsatt av ManageThread.

 mens (isRunning) {for (Iterator iterator = managedThreads.iterator (); iterator.hasNext ();) {ManagedThreadData thrddata = (ManagedThreadData) iterator.next (); if (thrddata.getManagedThread (). isHung ()) {logger.warn ("Thread Hang detected for ThreadName =" + thrddata.getManagedThread (). getName ()); switch (thrddata.getManagedAction ()) {case RESTART_THREAD: // Handlingen her er å starte tråden på nytt // fjerne fra manager iterator.remove (); // stoppe behandlingen av denne tråden hvis mulig thrddata.getManagedThread (). stopProcessing (); if (thrddata.getManagedThread (). getClass () == ThreadHangTester.class) // Å vite hvilken type tråd du vil opprette {ThreadHangTester newThread = new ThreadHangTester ("restarted_ThrdHangTest", 5000, true); // Opprett en ny tråd newThread.start (); // legg den tilbake for å administreres administrere (newThread, thrddata.getManagedAction (), thrddata.getThreadChecktime ()); } gå i stykker; ......... 

For en ny ManagedThread skal opprettes og brukes i stedet for den hengte, den skal ikke inneholde noen tilstand eller noen container. For dette beholderen som ManagedThread handlinger bør skilles ut. Her bruker vi det ENUM-baserte Singleton-mønsteret for å holde oppgavelisten. Så beholderen som holder oppgavene er uavhengig av tråden som behandler oppgavene. Klikk på følgende lenke for å laste ned kilden for mønsteret som er beskrevet: Java Thread Manager Source.

Hengende tråder og Java ThreadPool-strategier

Java ThreadPool har ikke en mekanisme for å oppdage hengende tråder. Ved hjelp av en strategi som fast threadpool (Executors.newFixedThreadPool ()) vil ikke fungere fordi hvis noen oppgaver henger over tid, vil alle trådene til slutt være i en hengt tilstand. Et annet alternativ er å bruke en hurtigbufret ThreadPool-policy (Executors.newCachedThreadPool ()). Dette kan sikre at det alltid vil være tråder tilgjengelig for å behandle en oppgave, bare begrenset av VM-minne, CPU og trådgrenser. Imidlertid, med denne policyen er det ingen kontroll over antall tråder som blir opprettet. Uansett om en behandlingstråd henger eller ikke, fører bruk av denne policyen mens oppgaven er høy til at et stort antall tråder blir opprettet. Hvis du ikke har nok ressurser for JVM veldig snart, vil du treffe maksimal minneterskel eller høy CPU. Det er ganske vanlig å se antall tråder som treffer hundrevis eller tusenvis. Selv om de blir utgitt når oppgaven er behandlet, vil det store antallet tråder noen ganger overvelde systemressursene under burst-håndtering.

Et tredje alternativ er å bruke tilpassede strategier eller policyer. Et slikt alternativ er å ha et trådområde som skalerer fra 0 til noe maksimalt antall. Så selv om en tråd henges, vil en ny tråd opprettes så lenge det maksimale antall trådene ble nådd:

 execexec = new ThreadPoolExecutor (0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue ()); 

Her 3 er det maksimale antall tråder og levetid er satt til 60 sekunder, da dette er en oppgaveintensiv prosess. Hvis vi gir høy nok maksimal trådtelling, er dette mer eller mindre en rimelig policy å bruke i sammenheng med hengende oppgaver. Det eneste problemet er at hvis de hengende trådene ikke frigjøres til slutt, er det en liten sjanse for at alle trådene på et tidspunkt kan henge. Hvis maksimale tråder er tilstrekkelig høye og antar at en oppgave henger er et sjeldent fenomen, vil denne politikken passe regningen.

Det hadde vært søtt om ThreadPool hadde også en pluggbar mekanisme for å oppdage hengende tråder. Jeg vil diskutere en slik design senere. Selvfølgelig, hvis alle trådene er fryset opp, kan du konfigurere og bruke den avviste oppgavepolitikken for trådgruppen. Hvis du ikke vil forkaste oppgavene, må du bruke CallerRunsPolicy:

 execexec = new ThreadPoolExecutor (0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue () new ThreadPoolExecutor.CallerRunsPolicy ()); 

I dette tilfellet, hvis en trådhenging førte til at en oppgave ble avvist, ville denne oppgaven blitt gitt til ringetråden som skal håndteres. Det er alltid en sjanse for at oppgaven også henger. I dette tilfellet vil hele prosessen fryse. Så det er bedre å ikke legge til en slik policy i denne sammenhengen.

 offentlig klasse NotificationProcessor implementerer Runnable {private final NotificationOriginator notificationOrginator; boolsk isRunning = true; privat slutt ExecutorService execexec; AlarmNotificationProcessor (NotificationOriginator norginator) {// ctor // execexec = Executors.newCachedThreadPool (); // For mange tråder // execexec = Executors.newFixedThreadPool (2); //, ingen påkjenning av hang-oppgaver execexec = new ThreadPoolExecutor (0, 4 , 250, TimeUnit.MILLISECONDS, new SynchronousQueue (), new ThreadPoolExecutor.CallerRunsPolicy ()); } public void run () {while (isRunning) {try {final Task task = TaskQueue.INSTANCE.getTask (); Runnable thisTrap = new Runnable () {public void run () {++ alarmid; notificaionOrginator.notify (ny OctetString (), // Oppgavebehandling nbialarmnew.getOID (), nbialarmnew.createVariableBindingPayload ()); É ........}}; execexec.execute (thisTrap); } 

En tilpasset trådpool med hangdeteksjon

Et trådbassengbibliotek med muligheten for oppgaveløsning og håndtering vil være flott å ha. Jeg har utviklet en, og jeg vil demonstrere den nedenfor. Dette er faktisk en port fra en C ++ tråd pool som jeg designet og brukte for en tid tilbake (se referanser). I utgangspunktet bruker denne løsningen kommandomønsteret og kjeden av ansvarsmønster. Imidlertid er det litt vanskelig å implementere kommandomønsteret i Java uten hjelp av funksjonsobjektstøtte. For dette måtte jeg endre implementeringen litt for å bruke Java-refleksjon. Legg merke til at konteksten som dette mønsteret ble designet i, var hvor et trådbasseng måtte monteres inn / plugges inn uten å endre noen av de eksisterende klassene. (Jeg tror den ene store fordelen med objektorientert programmering er at den gir oss en måte å designe klasser for å gjøre effektiv bruk av Open Closed Principle. Dette gjelder spesielt for kompleks gammel arvskode og kan være av mindre relevans for ny produktutvikling.) Derfor brukte jeg refleksjon i stedet for å bruke et grensesnitt for å implementere kommandomønsteret. Resten av koden kan porteres uten større endringer ettersom nesten alle trådsynkroniserings- og signalprimitivene er tilgjengelige i Java 1.5 og utover.

 offentlig klasse Command {private Object [] argParameter; ........ // Ctor for en metode med to args Kommando (T pObj, String methodName, lang timeout, String key, int arg1, int arg2) {m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = nøkkel; argParameter = nytt objekt [2]; argParameter [0] = arg1; argParameter [1] = arg2; } // Kaller metoden til objektet void execute () {Class klass = m_objptr.getClass (); Class [] paramTypes = new Class [] {int.class, int.class}; prøv {Method methodName = klass.getMethod (m_methodName, paramTypes); //System.out.println("Funnet metoden -> "+ methodName); hvis (argParameter.length == 2) {methodName.invoke (m_objptr, (Object) argParameter [0], (Object) argParameter [1]); } 

Eksempel på bruk av dette mønsteret:

 offentlig klasse CTask {.. public int DoSomething (int a, int b) {...}} 

Kommando cmd4 = ny kommando (oppgave4, "DoMultiplication", 1, "key2", 2,5);

Nå har vi to viktige klasser her. Den ene er den Trådkjede klasse, som implementerer Chain of Responsibility-mønsteret:

 offentlig klasse ThreadChain implementerer Runnable {public ThreadChain (ThreadChain p, ThreadPool pool, String name) {AddRef (); deleteMe = false; opptatt = falsk; // -> veldig viktig neste = p; // sett trådkjeden - merk at dette er som en lenket liste impl threadpool = pool; // set the thread pool - Root of the threadpool ........ threadId = ++ ThreadId; ...... // start tråden thisThread = ny tråd (dette, navn + inttid.toString ()); thisThread.start (); } 

Denne klassen har to hovedmetoder. Den ene er boolsk Kan tåle() som er initiert av ThreadPool klasse og fortsetter deretter rekursivt. Dette sjekker om gjeldende tråd (gjeldende Trådkjede eksempel) er fri til å håndtere oppgaven. Hvis den allerede håndterer en oppgave, kaller den den neste i kjeden.

 public Boolean canHandle () {if (! busy) {// Hvis ikke opptatt System.out.println ("Can Handle This Event in id =" + threadId); // todo signal en hendelse prøv {condLock.lock (); condWait.signal (); // Signaler HandleRequest som venter på dette i kjøremetoden .................................... ..... returner sant; } ......................................... /// Se om den neste objektet i kjeden er gratis /// for å håndtere forespørselen tilbake neste.canHandle (); 

Merk at HandleRequest er en metode for Trådkjede som påkalles fra den Trådløp () metode og venter på signalet fra kan tåle metode. Legg også merke til hvordan oppgaven håndteres via kommandomønsteret.

$config[zx-auto] not found$config[zx-overlay] not found