java - Multiple threads to read and import files in a directory -
i have java program 2 steps:
- read files in directory recursively multiple threads (10 threads)
- send these files server http post
it seems first step works well, second sends same file 10 times.
how correct error?
here log:
import file: 2005_1.xml import file: 2005_7.xml import file: 2005_6.xml import file: 2005_10.xml import file: 2005_5.xml import file: 2005_11.xml import file: 2005_8.xml import file: 2005_2.xml import file: 2005_3.xml import file: 2005_4.xml result: {"filename":"2005_4.xml" result: {"filename":"2005_4.xml" ... response: httpresponseproxy{http/1.1 400 [ import file: 2005_9.xml result: {"filename":"2005_4.xml" ... response: httpresponseproxy{http/1.1 200 [ result: {"filename":"2005_4.xml" response: httpresponseproxy{http/1.1 200 [ result: {"filename":"2005_9.xml" and code: read files in directory multiple threads:
public void listsendfilesmultithread(final file folder) { executorservice service = executors.newfixedthreadpool(10, getthreadfactory()); (final file fileentry : folder.listfiles()) { runnable r; r = new runnable() { @override public void run() { if (fileentry.isdirectory()) { listsendfilesmultithread(fileentry); } else { getthread thread = new getthread(fileentry, errorfilesdestdir); // start thread thread.start(); // join threads try { thread.join(); } catch (interruptedexception e) { logger.error("interruptedexception: " + e); } } } }; service.execute(r); } } send files http post:
static class getthread extends thread { private final closeablehttpclient closeablehttpclient; private final file file; private final string errorfilesdestdir; private final poolinghttpclientconnectionmanager cm; private multipartentitybuilder builder; public getthread(file file, string errorfilesdestdir) { cm = new poolinghttpclientconnectionmanager(); closeablehttpclient = httpclients.custom().setconnectionmanager(cm).build(); this.file = file; this.errorfilesdestdir = errorfilesdestdir; } @override public void run() { try { if (file.exists() && file.length() > 0) { filebody filebody = new filebody(file); // should create new builder per file builder = multipartentitybuilder.create(); builder.setmode(httpmultipartmode.browser_compatible); builder.addpart("xmlfile", filebody); httpentity entity = builder.build(); request.setentity(entity); logger.info("import file: " + file.getname()); closeablehttpresponse response = closeablehttpclient.execute(request); logger.info("response: {}", response.tostring()); try { entity = response.getentity(); printinfo(response, entity); } { response.close(); closeablehttpclient.close(); cm.close(); } entityutils.consume(entity); } else if (file.length() == 0) { logger.error("the import xml file empty: " + file.getabsolutepath()); files.copy(file.topath(), new file(errorfilesdestdir + file.getname()).topath(), standardcopyoption.replace_existing); } else { logger.error("the import xml file doesn't exist"); } } catch (clientprotocolexception e) { // handle protocol errors logger.error("clientprotocolexception: " + e.tostring()); } catch (ioexception e) { // handle i/o errors logger.error("ioexception: " + e.tostring()); } } } connection eviction policy
public static class idleconnectionmonitorthread extends thread { private final httpclientconnectionmanager connmgr; private volatile boolean shutdown; public idleconnectionmonitorthread(httpclientconnectionmanager connmgr) { super(); this.connmgr = connmgr; } @override public void run() { try { while (!shutdown) { synchronized (this) { wait(5000); // close expired connections connmgr.closeexpiredconnections(); // optionally, close connections // have been idle longer 30 sec connmgr.closeidleconnections(30, timeunit.seconds); } } } catch (interruptedexception e) { logger.error("interruptedexception: " + e.tostring()); } } public void shutdown() { shutdown = true; synchronized (this) { notifyall(); } } }
i recommend use producer , consumer pattern simplify code. 1 thread or several (one should enough not doing processing) use logic , find files uploaded. next, place them in queue. start many consumers wish read records queue , upload them server. https://dzone.com/articles/concurrency-pattern-producer consumers have logic read file disk , upload server.
Comments
Post a Comment