Programovanie

Java - detekcia zavesenia vlákna a manipulácia s ním

Autor: Alex. C. Punnen

Architekt - Nokia Siemens Networks

Bangalore

Závesné vlákna sú bežnou výzvou pri vývoji softvéru, ktorý musí komunikovať s proprietárnymi zariadeniami pomocou proprietárnych alebo štandardizovaných rozhraní, ako sú SNMP, Q3 alebo Telnet. Tento problém sa neobmedzuje iba na správu siete, ale vyskytuje sa v širokej škále oblastí, ako sú webové servery, procesy vyvolávajúce vzdialené volania procedúr atď.

Vlákno, ktoré iniciuje požiadavku na zariadenie, potrebuje mechanizmus na detekciu v prípade, že zariadenie nereaguje alebo reaguje iba čiastočne. V niektorých prípadoch, keď sa zistí takéto zablokovanie, je potrebné vykonať konkrétnu akciu. Konkrétnou akciou môže byť opakovanie alebo informovanie koncového používateľa o zlyhaní úlohy alebo iná možnosť obnovenia. V niektorých prípadoch, keď musí komponent prepáliť veľký počet úloh na veľký počet sieťových prvkov, je dôležitá detekcia zavesenia vlákna, aby sa nestal prekážkou pre ďalšie spracovanie úloh. Správa závesných vlákien má teda dva aspekty: výkon a oznámenia.

Pre aspekt oznámenia môžeme prispôsobiť vzor Java Observer tak, aby zapadol do viacvláknového sveta.

Prispôsobenie vzoru pozorovateľa Java na viacvláknové systémy

Z dôvodu zavesenia úloh, používanie Java ThreadPool trieda s vhodnou stratégiou je prvé riešenie, ktoré mi napadne. Avšak pomocou Javy ThreadPool v súvislosti s niektorými vláknami náhodne visiacimi po určitú dobu dáva nežiaduce správanie na základe konkrétnej použitej stratégie, ako napríklad hladovanie vlákien v prípade stratégie stratégie pevných vlákien. Je to hlavne kvôli skutočnosti, že Java ThreadPool nemá mechanizmus na zisťovanie zablokovania vlákna.

Mohli by sme vyskúšať fond vlákien s vyrovnávacou pamäťou, ale má tiež problémy. Ak je vysoká rýchlosť vypaľovania úloh a niektoré vlákna visia, počet vlákien by mohol vystreliť nahor, čo by nakoniec mohlo spôsobiť nedostatok zdrojov a nedostatok pamäte. Alebo by sme mohli použiť Custom ThreadPool stratégia vyvolávajúca a CallerRunsPolicy. Aj v tomto prípade môže zavesenie vlákna spôsobiť, že sa všetky vlákna nakoniec zavesia. (Hlavné vlákno by nikdy nemalo byť volajúcim, pretože existuje možnosť, že akákoľvek úloha odovzdaná hlavnému vláknu môže visieť a spôsobiť, že sa všetko zastaví.)

Aké je teda riešenie? Ukážem nie príliš jednoduchý vzor ThreadPool, ktorý upravuje veľkosť bazénu podľa miery úloh a na základe počtu zavesených vlákien. Poďme najskôr k problému detekcie visiacich vlákien.

Zisťovanie visiacich vlákien

Obrázok 1 zobrazuje abstrakciu vzoru:

Existujú dve dôležité triedy: ThreadManager a ManagedThread. Oba vychádzajú z Javy Závit trieda. The ThreadManager má nádobu, ktorá drží ManagedThreads. Keď nový ManagedThread je vytvorený, pridá sa do tohto kontajnera.

 ThreadHangTester testthread = nový ThreadHangTester ("threadhangertest", 2000, false); testthread.start (); thrdManger.manage (testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start (); 

The ThreadManager iteruje týmto zoznamom a volá ManagedThreadje isHung () metóda. Toto je v podstate logika kontroly časovej pečiatky.

 if (System.currentTimeMillis () - lastprocessingtime.get ()> maxprocessingtime) {logger.debug ("Vlákno je zavesené"); návrat pravdivý; } 

Ak zistí, že sa vlákno dostalo do slučky úloh a nikdy neaktualizovalo svoje výsledky, použije mechanizmus obnovy, ako je stanovené v ManageThread.

 while (isRunning) {for (Iterator iterator = managedThreads.iterator (); iterator.hasNext ();) {ManagedThreadData thrddata = (ManagedThreadData) iterator.next (); if (thrddata.getManagedThread (). isHung ()) {logger.warn ("Zistilo sa zavesenie vlákna pre ThreadName =" + thrddata.getManagedThread (). getName ()); switch (thrddata.getManagedAction ()) {case RESTART_THREAD: // Tu je potrebné reštartovať vlákno // odstrániť z manažéra iterator.remove (); // pokiaľ je to možné, zastavte spracovanie tohto vlákna thrddata.getManagedThread (). stopProcessing (); if (thrddata.getManagedThread (). getClass () == ThreadHangTester.class) // Aby ste vedeli, aký typ vlákna chcete vytvoriť {ThreadHangTester newThread = nový ThreadHangTester ("restarted_ThrdHangTest", 5000, true); // Vytvorenie nového vlákna newThread.start (); // pridaj naspäť na správu spravovať (newThread, thrddata.getManagedAction (), thrddata.getThreadChecktime ()); } prestávka; ......... 

Za nový ManagedThread aby sa vytvoril a použil namiesto zaveseného, ​​nemal by obsahovať žiadny stav alebo nádobu. Pre tento kontajner, na ktorom ManagedThread činy by mali byť oddelené. Tu používame vzor Singleton založený na ENUM, aby sme držali zoznam úloh. Takže kontajner obsahujúci úlohy je nezávislý od vlákna spracovávajúceho úlohy. Kliknutím na nasledujúci odkaz stiahnete zdroj opísaného vzoru: Zdroj Java Thread Manager.

Závesné vlákna a stratégie Java ThreadPool

Java ThreadPool nemá mechanizmus na zisťovanie visiacich nití. Používanie stratégie ako pevný threadpool (Executors.newFixedThreadPool ()) nebude fungovať, pretože ak niektoré úlohy časom prestanú pracovať, všetky vlákna budú nakoniec v zablokovanom stave. Ďalšou možnosťou je použitie politiky ThreadPool vo vyrovnávacej pamäti (Executors.newCachedThreadPool ()). To by mohlo zaistiť, že na spracovanie úlohy budú vždy k dispozícii vlákna, ktoré budú obmedzené iba obmedzeniami pamäte VM, CPU a vlákien. Vďaka tejto politike však neexistuje žiadna kontrola nad počtom vytvorených vlákien. Bez ohľadu na to, či procesor vlákno zasekne alebo nie, použitie tejto politiky pri vysokej miere úloh vedie k vytvoreniu obrovského počtu vlákien. Ak nemáte dostatok prostriedkov na JVM veľmi skoro, narazíte na maximálnu prahovú hodnotu pamäte alebo vysoký procesor. Je celkom bežné vidieť počet vlákien zasiahnutých stovkami alebo tisíckami. Aj keď sú uvoľnené po spracovaní úlohy, niekedy počas spracovania burstov vysoký počet vlákien preplní systémové prostriedky.

Treťou možnosťou je použitie vlastných stratégií alebo politík. Jednou z takýchto možností je mať fond vlákien, ktorý sa prispôsobí od 0 do maximálneho počtu. Takže aj keby jedno vlákno viselo, nové vlákno by sa vytvorilo, pokiaľ by sa dosiahol maximálny počet vlákien:

 execexec = new ThreadPoolExecutor (0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue ()); 

Tu 3 je maximálny počet vlákien a čas udržiavania nažive je nastavený na 60 sekúnd, pretože ide o proces náročný na úlohy. Ak dáme dostatočne vysoký maximálny počet vlákien, je to viac-menej rozumná politika, ktorú je možné použiť v súvislosti so závesnými úlohami. Jediným problémom je, že ak sa závesné vlákna nakoniec neuvoľnia, existuje malá šanca, že by všetky vlákna mohli v určitom okamihu visieť. Ak je maximálny počet vlákien dostatočne vysoký a predpokladáme, že zastavenie úlohy je zriedkavým javom, potom by sa táto politika hodila k návrhu zákona.

Bolo by to sladké, keby ThreadPool mal tiež zásuvný mechanizmus na zisťovanie visiacich nití. O jednom takom dizajne budem hovoriť neskôr. Samozrejme, ak sú všetky vlákna zamrznuté, môžete nakonfigurovať a použiť politiku odmietnutých úloh fondu vlákien. Ak sa nechcete zbaviť úloh, ktoré by ste museli použiť CallerRunsPolicy:

 execexec = nový ThreadPoolExecutor (0, 20, 20, TimeUnit.MILLISECONDS, nový SynchronousQueue () nový ThreadPoolExecutor.CallerRunsPolicy ()); 

V takom prípade, ak by zavesenie vlákna spôsobilo odmietnutie úlohy, táto úloha by sa dostala volajúcemu vláknu, ktoré sa má spracovať. Vždy existuje šanca, že táto úloha bude príliš visieť. V takom prípade by celý proces zamrzol. Je preto lepšie nepridávať takúto politiku v tejto súvislosti.

 verejná trieda NotificationProcessor implementuje Runnable {private final NotificationOriginator notificationOrginator; boolean isRunning = true; súkromný konečný ExecutorService execexec; AlarmNotificationProcessor (NotificationOriginator norginator) {// ctor // execexec = Executors.newCachedThreadPool (); // Príliš veľa vlákien // execexec = Executors.newFixedThreadPool (2); //, detekcia zablokovania úloh execexec = nový ThreadPoolExecutor (0 , 250, TimeUnit.MILLISECONDS, new SynchronousQueue (), new ThreadPoolExecutor.CallerRunsPolicy ()); } public void run () {while (isRunning) {try {final Task task = TaskQueue.INSTANCE.getTask (); Runnable thisTrap = new Runnable () {public void run () {++ alarmid; notificaionOrginator.notify (new OctetString (), // Spracovanie úlohy nbialarmnew.getOID (), nbialarmnew.createVariableBindingPayload ()); É ........}}; execexec.execute (thisTrap); } 

Vlastný ThreadPool s detekciou zablokovania

Knižnica fondu vlákien so schopnosťou detekcie a manipulácie so zablokovaním úloh by bola skvelá. Jeden som vyvinul a ukážem ho nižšie. Toto je vlastne port z fondu vlákien C ++, ktorý som navrhol a použil už dávnejšie (pozri referencie). Toto riešenie v zásade využíva vzor príkazov a vzor reťazca zodpovednosti. Implementácia príkazového vzoru v Jave bez podpory podpory objektov Function je však trochu zložitá. Z tohto dôvodu som musel implementáciu mierne zmeniť, aby som mohol používať reflexiu Java. Všimnite si, že v kontexte, v ktorom bol tento vzor navrhnutý, bol ten, kde bolo treba zapojiť / zapojiť fond vlákien bez úpravy ktorejkoľvek z existujúcich tried. (Domnievam sa, že jednou z veľkých výhod objektovo orientovaného programovania je, že nám poskytuje spôsob, ako navrhovať triedy tak, aby sme efektívne využívali zásadu otvoreného uzavretia. To platí najmä pre zložitý starý starší kód a môže mať menší význam pre vývoj nového produktu.) Preto som na implementáciu príkazového vzoru použil rozhranie namiesto použitia rozhrania. Zvyšok kódu bolo možné preniesť bez väčších zmien, pretože takmer všetky primitíva synchronizácie a signalizácie vlákien sú k dispozícii od verzie Java 1.5.

 príkaz verejnej triedy {private Object [] argParameter; ........ // Ctor pre metódu s dvoma príkazmi args (T pObj, String methodName, long timeout, String key, int arg1, int arg2) {m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = kľúč; argParameter = nový objekt [2]; argParameter [0] = arg1; argParameter [1] = arg2; } // Zavolá metódu objektu void execute () {Class klass = m_objptr.getClass (); Trieda [] paramTypes = nová Trieda [] {int.class, int.class}; try {Method methodName = klass.getMethod (m_methodName, paramTypes); //System.out.println("Found the method -> "+ methodName); if (argParameter.length == 2) {methodName.invoke (m_objptr, (Object) argParameter [0], (Object) argParameter [1]); } 

Príklad použitia tohto vzoru:

 verejná trieda CTask {.. public int DoSomething (int a, int b) {...}} 

Príkaz cmd4 = nový príkaz (task4, "DoMultiplication", 1, "key2", 2,5);

Teraz tu máme ďalšie dve dôležité triedy. Jedným z nich je ThreadChain triedy, ktorá implementuje model reťazca zodpovednosti:

 verejná trieda ThreadChain implementuje Runnable {public ThreadChain (ThreadChain p, bazén ThreadPool, názov reťazca) {AddRef (); deleteMe = false; zaneprázdnený = nepravdivý; // -> veľmi dôležité next = p; // nastaviť reťazec vlákien - všimnite si, že je to ako prepojený zoznam impl threadpool = pool; // nastavenie fondu vlákien - koreň skupiny vlákien ........ threadId = ++ ThreadId; ...... // spustiť vlákno thisThread = new Thread (this, name + inttid.toString ()); thisThread.start (); } 

Táto trieda má dve hlavné metódy. Jeden je boolovský Môže zvládnuť() ktorý iniciuje ThreadPool triedy a potom pokračuje rekurzívne. Týmto sa skontroluje, či je aktuálne vlákno (current ThreadChain inštancia) môže úlohu zvládnuť bezplatne. Ak už spracováva úlohu, zavolá ďalšiu v reťazci.

 public Boolean canHandle () {if (! busy) {// If not busy System.out.println ("Môže túto udalosť spracovať v id =" + threadId); // todo signál udalosti try {condLock.lock (); condWait.signal (); // Signalizujte HandleRequest, ktorý na to čaká v metóde spustenia .................................... ..... návrat pravdivý; } ......................................... /// Inak uvidíte, či ďalší objekt v reťazci je voľný /// na vybavenie žiadosti return next.canHandle (); 

Všimnite si, že HandleRequest je metóda ThreadChain ktorá je vyvolaná z Beh vlákna () metóda a čaká na signál z môže zvládnuť metóda. Všimnite si tiež, ako sa s úlohou zaobchádza pomocou príkazového vzoru.

$config[zx-auto] not found$config[zx-overlay] not found