java - FileWritingMessageHandler (int-file:outbound-channel-adapter) is slow for single-file storing multiple-messages use case -
i'm using spring 4.1.2 spring integration 4.1.0.
i have use case i'd produce single file contain row each message flowed channel. messages received of type string
. file nice-to-have file, meaning it's not necessary have writing file within same transaction of master flow. async wire-tap pattern implemented use-case. messages written file need in same order received in (so either 1 thread need process them or need aggregator wait multiple threads complete write them in original order).
i wanted feel performant way handle use-case tried few tests. make bit easier tests not using async wire-tap (but mentioned in use-case because perhaps suggestions might involve batching/buffering solutions).
the general flow came "define integration flow" section of link: https://spring.io/guides/gs/integration/
the main options tried were:
- use
int-file:outbound-channel-adapter
(which createsfilewritingmessagehandler
) along transformer appends newline each message (transformer uses spel expressionpayload + '#{systemproperties['line.separator']}
.
spring.expression.compiler.mode=off
- use
int-file:outbound-channel-adapter
(which createsfilewritingmessagehandler
) along transformer appends newline each message (transformer uses spel expressionpayload.tostring() + '#{systemproperties['line.separator']}
.
spring.expression.compiler.mode=mixed
note: usingpayload.tostring()
ratherpayload
around spel issue: https://jira.spring.io/browse/spr-12514 - use
int:logging-channel-adapter
ratherint-file:outbound-channel-adapter
(saving on having use transformer spel expression).
tested log4j2 usingrollingrandomaccessfile
, synchronous loggers.
spring.expression.compiler.mode=off
- use
int:logging-channel-adapter
ratherint-file:outbound-channel-adapter
(saving on having use transformer spel expression).
tested log4j2 usingrollingrandomaccessfile
, asynchronous loggers. see http://logging.apache.org/log4j/2.0/manual/async.html#making loggers asynchronous .
spring.expression.compiler.mode=off
- use
int:logging-channel-adapter
ratherint-file:outbound-channel-adapter
(saving on having use transformer spel expression).
tested log4j2 usingrollingrandomaccessfile
, asynchronous loggers. see http://logging.apache.org/log4j/2.0/manual/async.html#making loggers asynchronous .
spring.expression.compiler.mode=mixed
test cases 1 , 2 flow:
test cases 3 thru 5 flow:
the input file contains xml data (strings) vary in length between 1200 , 1500 chars per line (each line single message).
in test have 203,712 messages.
below timings. i'm showing timings first items bit more last items since spel compiler kicks in after period of time.
| 1 | 2 | 3 | 4 | 5 | |springint fileadapter | springint fileadapter | log4j2 rollingrandomaccessfile | log4j2 rollingrandomaccessfile | log4j2 rollingrandomaccessfile | | | | sync loggers | async loggers | async | |spel-compiler=off | spel-compiler=mixed | spel-compiler=off | spel-compiler=off | spel-compiler=mixed | |-------------------------|--------------------------|--------------------------------|--------------------------------|------------------------------- | |cnt=10000 : 0:00:12.670 | cnt=10000 : 0:00:17.235 | cnt=10000 : 0:00:08.222 | cnt=10000 : 0:00:01.847 | cnt=10000 : 0:00:01.320 | |cnt=20000 : 0:00:24.636 | cnt=20000 : 0:00:30.208 | cnt=20000 : 0:00:08.828 | cnt=20000 : 0:00:02.232 | cnt=20000 : 0:00:01.839 | |cnt=30000 : 0:00:36.179 | cnt=30000 : 0:00:44.300 | cnt=30000 : 0:00:09.426 | cnt=30000 : 0:00:02.512 | cnt=30000 : 0:00:02.647 | |... | .... | ... | ... | ... | |cnt=180000 : 0:02:58.935 | cnt=180000 : 0:04:15.528 | cnt=180000 : 0:00:17.095 | cnt=180000 : 0:00:08.546 | cnt=180000 : 0:00:07.936 | |cnt=200000 : 0:03:16.473 | cnt=200000 : 0:04:35.582 | cnt=200000 : 0:00:18.107 | cnt=200000 : 0:00:09.548 | cnt=200000 : 0:00:08.660 | |cnt=203712 : 0:03:19.715 | cnt=203712 : 0:04:39.452 | cnt=203712 : 0:00:18.284 | cnt=203712 : 0:00:09.661 | cnt=203712 : 0:00:08.732 |
take timings grain of salt - didn't run these dozens of times , take averages. i'm not advocating log4j2 faster other offerrings logback, i'm merely using comparision purposes. note: i'm using file input test. point out because might suggest have spring integration copy raw file filea fileb. in our real use-case messages come in via jms file-to-file solution not real option.
interesting points:
- spring integration
filewritingmessagehandler
slower log4j2 offering.
log4j2-async took 4.3% of timefilewritingmessagehandler
did (199.715 secs scenario1 vs 8.732 sec scenario5).
log4j2-sync took 4.8% of timefilewritingmessagehandler
did (199.715 secs scenario1 vs 9.661 sec scenario4). - spring integration
filewritingmessagehandler
spring.expression.compiler.mode=mixed
(scenario #2) slowerspring.expression.compiler.mode=off
. assume it's because in scenario #1 able usepayload + '#{systemproperties['line.separator']}
whereas in scenario #2 had usepayload.tostring() + '#{systemproperties['line.separator']}
- scenarios 3 thru 5 expected, relative other scenarios.
ideally don't having use logging-channel-adapter
write messages file - seems i'm bastardizing component. performance gain signficant unfortunately of can't rule out using that.
questions are:
- what other options have besides writing own
filewritingmessagehandler
better file-writing performance? - i assume perhaps if batch or aggregate prior
filewritingmessagehandler
write out batched group performance might better. i'm sure use task-executor , poller (my use-case permit this). if batching should considered option shouldfilewritingmessagehandler
exposebuffersize
attribute? - could
filewritingmessagehandler
tweaked, or perhaps additional more-specific versions offerred more performant use-case (perhaps taking advice/hints log4j2 loggers)? - would file
streamwriter
more performant? - just thinking out-loud on one: should log4j2 "wrapper" class acts purely file-adapter offerred (i.e. have log message no line/class/etc info, have write regardless of level, user merely pass in filename , perhaps sync/async)?
- can spel compiler better optimized handle
payload.tostring() + '#{systemproperties['line.separator']}
case, since noted slower not calling tostring() in spel itself?
below code/config files used tests.
pom.xml
<dependencies> <!-- testing --> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- spring integration --> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> <version>${spring.integration.version}</version> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-file</artifactid> <version>${spring.integration.version}</version> </dependency> <!-- logging --> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <version>1.7.7</version> </dependency> <!-- binding jcl (aka java common logging). --> <!-- needed since things commons libs use commons-logging don't want --> <dependency> <groupid>org.slf4j</groupid> <artifactid>jcl-over-slf4j</artifactid> <version>1.7.7</version> <!-- making scope runtime we'll catch of our own classes try use commons-logging when compile --> <scope>runtime</scope> </dependency> <!-- binding log4j --> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-slf4j-impl</artifactid> <!-- of 9/12/2014 our company maven repos not have 2.0.2 --> <version>2.0.1</version> </dependency> <!-- log4j api , core implementation required binding --> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-api</artifactid> <version>2.0.2</version> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-core</artifactid> <version>2.0.2</version> </dependency> <!-- async loggers log4j2 require lmax disruptor, see http://logging.apache.org/log4j/2.x/manual/async.html --> <dependency> <groupid>com.lmax</groupid> <artifactid>disruptor</artifactid> <version>3.2.1</version> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> <version>3.1</version> </dependency> </dependencies>
java classes
package com.xxx; import java.util.scanner; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.context.support.abstractapplicationcontext; import org.springframework.context.support.classpathxmlapplicationcontext; /** * starts spring context , initialize spring integration routes. */ public final class main { private static final logger logger = loggerfactory.getlogger(main.class); private main() { } /** * load spring integration application context * * @param args - command line arguments */ public static void main(final string... args) { if (logger.isinfoenabled()) { logger.info("\n=========================================================" + "\n " + "\n welcome spring integration! " + "\n " + "\n more information please visit: " + "\n http://www.springsource.org/spring-integration " + "\n " + "\n========================================================="); } final abstractapplicationcontext context = new classpathxmlapplicationcontext("classpath:meta-inf/spring/integration/spring-integration-context-usecases.xml"); context.registershutdownhook(); springintegrationutils.displaydirectories(context); final scanner scanner = new scanner(system.in); if (logger.isinfoenabled()) { logger.info("\n=========================================================" + "\n " + "\n please press 'q + enter' quit application. " + "\n " + "\n========================================================="); } while (!scanner.hasnext("q")) { //do nothing unless user presses 'q' quit. } if (logger.isinfoenabled()) { logger.info("exiting application...bye."); } system.exit(0); } } package com.xxx; import java.io.file; import java.io.ioexception; import java.util.collections; import java.util.iterator; import java.util.list; import java.util.map; import java.util.concurrent.atomic.atomicinteger; import org.apache.commons.lang3.time.stopwatch; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.integration.integrationmessageheaderaccessor; import org.springframework.integration.routingslip.routingsliproutestrategy; import org.springframework.integration.splitter.abstractmessagesplitter; import org.springframework.integration.support.abstractintegrationmessagebuilder; import org.springframework.integration.transformer.messagetransformationexception; import org.springframework.messaging.message; import org.springframework.messaging.messagechannel; import org.springframework.messaging.messageheaders; import org.springframework.messaging.messagingexception; import org.springframework.messaging.core.destinationresolutionexception; import org.springframework.util.assert; import org.springframework.util.stringutils; /** * class needed until bug fixed in spring integration 4.1.0. * see {@link http://stackoverflow.com/questions/27171978/read-csv-file-concurrently-using-spring-integration} * once fixed delete class , use in spring context file. * <code> * <splitter input-channel="splitchannel" output-channel="executorchannel" expression="t(org.apache.commons.io.fileutils).lineiterator(payload)"/> * </code> * */ public class filesplitter extends abstractmessagesplitter { private static final logger log = loggerfactory.getlogger(filesplitter.class); int counter = 0; stopwatch sw = new stopwatch(); public object splitmessage(message<?> message) { if (log.isdebugenabled()) { log.debug(message.tostring()); } try { object payload = message.getpayload(); assert.isinstanceof(file.class, payload, "expected java.io.file in message payload"); return org.apache.commons.io.fileutils.lineiterator((file) payload); } catch (ioexception e) { string msg = "unable transform file: " + e.getmessage(); log.error(msg); throw new messagetransformationexception(msg, e); } } @override protected void produceoutput(object result, message<?> requestmessage) { iterator<?> iterator = (iterator<?>) result; sw.start(); while (iterator.hasnext()) { ++counter; produceoutputinternal(iterator.next(), requestmessage); if (counter % 10000 == 0) { sw.split(); system.out.println("cnt=" + counter + " : " + sw.tosplitstring()); } } sw.stop(); system.out.println("completed"); system.out.println("cnt=" + counter + " : " + sw.tosplitstring()); } private object getoutputchannelfromroutingslip(object reply, message<?> requestmessage, list<?> routingslip, atomicinteger routingslipindex) { if (routingslipindex.get() >= routingslip.size()) { return null; } object path = routingslip.get(routingslipindex.get()); object routingslippathvalue = null; if (path instanceof string) { routingslippathvalue = getbeanfactory().getbean((string) path); } else if (path instanceof routingsliproutestrategy) { routingslippathvalue = path; } else { throw new illegalargumentexception("the routingslip 'path' can of " + "string or routingsliproutestrategy type, gotten: " + path); } if (routingslippathvalue instanceof messagechannel) { routingslipindex.incrementandget(); return routingslippathvalue; } else { object nextpath = ((routingsliproutestrategy) routingslippathvalue).getnextpath(requestmessage, reply); if (nextpath != null && (!(nextpath instanceof string) || stringutils.hastext((string) nextpath))) { return nextpath; } else { routingslipindex.incrementandget(); return getoutputchannelfromroutingslip(reply, requestmessage, routingslip, routingslipindex); } } } protected void produceoutputinternal(object reply, message<?> requestmessage) { messageheaders requestheaders = requestmessage.getheaders(); object replychannel = null; if (getoutputchannel() == null) { map<?, ?> routingslipheader = requestheaders.get(integrationmessageheaderaccessor.routing_slip, map.class); if (routingslipheader != null) { assert.istrue(routingslipheader.size() == 1, "the routingslip header value must singletonmap"); object key = routingslipheader.keyset().iterator().next(); object value = routingslipheader.values().iterator().next(); assert.isinstanceof(list.class, key, "the routingslip key must list"); assert.isinstanceof(integer.class, value, "the routingslip value must integer"); list<?> routingslip = (list<?>) key; atomicinteger routingslipindex = new atomicinteger((integer) value); replychannel = getoutputchannelfromroutingslip(reply, requestmessage, routingslip, routingslipindex); if (replychannel != null) { //todo migrate sf messagebuilder abstractintegrationmessagebuilder<?> builder = null; if (reply instanceof message) { builder = this.getmessagebuilderfactory().frommessage((message<?>) reply); } else if (reply instanceof abstractintegrationmessagebuilder) { builder = (abstractintegrationmessagebuilder<?>) reply; } else { builder = this.getmessagebuilderfactory().withpayload(reply); } builder.setheader(integrationmessageheaderaccessor.routing_slip, collections.singletonmap(routingslip, routingslipindex.get())); reply = builder; } } if (replychannel == null) { replychannel = requestheaders.getreplychannel(); } } message<?> replymessage = createoutputmessage(reply, requestheaders); sendoutput(replymessage, replychannel); } private message<?> createoutputmessage(object output, messageheaders requestheaders) { abstractintegrationmessagebuilder<?> builder = null; if (output instanceof message<?>) { if (!this.shouldcopyrequestheaders()) { return (message<?>) output; } builder = this.getmessagebuilderfactory().frommessage((message<?>) output); } else if (output instanceof abstractintegrationmessagebuilder) { builder = (abstractintegrationmessagebuilder<?>) output; } else { builder = this.getmessagebuilderfactory().withpayload(output); } if (this.shouldcopyrequestheaders()) { builder.copyheadersifabsent(requestheaders); } return builder.build(); } private void sendoutput(object output, object replychannel) { messagechannel outputchannel = getoutputchannel(); if (outputchannel != null) { replychannel = outputchannel; } if (replychannel == null) { throw new destinationresolutionexception("no output-channel or replychannel header available"); } if (replychannel instanceof messagechannel) { if (output instanceof message<?>) { this.messagingtemplate.send((messagechannel) replychannel, (message<?>) output); } else { this.messagingtemplate.convertandsend((messagechannel) replychannel, output); } } else if (replychannel instanceof string) { if (output instanceof message<?>) { this.messagingtemplate.send((string) replychannel, (message<?>) output); } else { this.messagingtemplate.convertandsend((string) replychannel, output); } } else { throw new messagingexception("replychannel must messagechannel or string"); } } } package com.xxx; import java.io.file; import java.util.arraylist; import java.util.list; import java.util.map; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.directfieldaccessor; import org.springframework.context.applicationcontext; import org.springframework.expression.expression; import org.springframework.integration.file.filereadingmessagesource; import org.springframework.integration.file.filewritingmessagehandler; /** * displays names of input , output directories. */ public final class springintegrationutils { private static final log logger = logfactory.getlog(springintegrationutils.class); private springintegrationutils() { } /** * helper method dynamically determine , display input , output * directories defined in spring integration context. * * @param context spring application context */ public static void displaydirectories(final applicationcontext context) { final file indir = (file) new directfieldaccessor(context.getbean(filereadingmessagesource.class)).getpropertyvalue("directory"); final map<string, filewritingmessagehandler> filewritingmessagehandlers = context.getbeansoftype(filewritingmessagehandler.class); final list<string> outputdirectories = new arraylist<string>(); (final filewritingmessagehandler messagehandler : filewritingmessagehandlers.values()) { final expression outdir = (expression) new directfieldaccessor(messagehandler).getpropertyvalue("destinationdirectoryexpression"); outputdirectories.add(outdir.getexpressionstring()); } final stringbuilder stringbuilder = new stringbuilder(); stringbuilder.append("\n========================================================="); stringbuilder.append("\n"); stringbuilder.append("\n input directory : '" + indir.getabsolutepath() + "'"); (final string outputdirectory : outputdirectories) { stringbuilder.append("\n output directory is: '" + outputdirectory + "'"); } stringbuilder.append("\n\n========================================================="); logger.info(stringbuilder.tostring()); } }
log4j2.xml
config file
<?xml version="1.0" encoding="utf-8"?> <configuration> <appenders> <console name="stdout" target="system_out"> <patternlayout pattern="%d{iso8601} [%t] [%-5p] (%c) - %m%n" /> </console> <rollingrandomaccessfile name="fileappendermessages" filename="c:/users/xxxxx/desktop/fileadapter-test/usecase3.txt"> <patternlayout pattern="%m %n" /> </rollingrandomaccessfile> </appenders> <loggers> <!-- wire-tap , logging-channel-adapter in spring cfg file use category name --> <logger name="filelogger" additivity="false"> <appenderref ref="fileappendermessages" /> </logger> <root level="info"> <appenderref ref="stdout" /> </root> </loggers> </configuration>
spring-integration-context-usecases.xml
file
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:task="http://www.springframework.org/schema/task" xsi:schemalocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int:inbound-channel-adapter id="fileadapter" ref="filereadingmessagesource" method="receive" auto-startup="true" channel="files" > <int:poller fixed-delay="#{t(java.lang.integer).max_value}"/> </int:inbound-channel-adapter> <bean id="filereadingmessagesource" class="org.springframework.integration.file.filereadingmessagesource"> <property name="directory" value="c:/users/xxxxx/desktop/tmg-exchange-gateway-nam/t2"/> </bean> <int:channel id="files"/> <int:splitter input-channel="files" output-channel="stringmessages"> <bean class="com.xxx.filesplitter" /> </int:splitter> <int:channel id="stringmessages"/> <int:transformer expression="payload + '#{systemproperties['line.separator']}'" output-channel="file" auto-startup="true" input-channel="stringmessages"/> <int-file:outbound-channel-adapter id="file" mode="append" charset="utf-8" directory="c:/users/xxxxx/desktop/fileadapter-test" auto-create-directory="true" filename-generator-expression="'usecase2.txt'"/> </beans>
tests can run using these settings:
1. java -dspring.expression.compiler.mode=off com.xxx.main leave context file unchanged. 2. java -dspring.expression.compiler.mode=mixed com.xxx.main change context file have expression="payload.tostring() + '#{systemproperties['line.separator']}'" 3. java -dspring.expression.compiler.mode=off com.xxx.main comment out transformer , outbound-channel-adapter. change logging-channel-adapter auto-startup="true" 4. java -dlog4jcontextselector=org.apache.logging.log4j.core.async.asyncloggercontextselector -dspring.expression.compiler.mode=off com.xxx.main comment out transformer , outbound-channel-adapter. change logging-channel-adapter auto-startup="true" 5. java -dlog4jcontextselector=org.apache.logging.log4j.core.async.asyncloggercontextselector -dspring.expression.compiler.mode=mixed com.xxx.main comment out transformer , outbound-channel-adapter. change logging-channel-adapter auto-startup="true"
thanks extensive analysis.
to honest, append
mode relatively recent addition outbound adapter , has not been optimized.
i suspect cost because stream closed on each write (using filecopy.copy()
) flushes disk.
we should consider option keep bufferedoutputstream
open. it's little tricky because adapter supports writing different file each message. presume use case write same file, or timestamp-based file name. provide optimization keep file open until request different file comes in, or keep several file buffers open.
at some, point, though, want flush buffer(s) if no new messages arrive after time elapses. adds complexity (but not lot).
of course, downside there risk of data loss in event of power failure when have data buffered in memory. classic tradeoff - performance vs. reliability; right adapter errs towards latter.
as always, feel free open jira issue , we'll take look.
Comments
Post a Comment