java - FileWritingMessageHandler (int-file:outbound-channel-adapter) is slow for single-file storing multiple-messages use case -


i'm using spring 4.1.2 spring integration 4.1.0.

i have use case i'd produce single file contain row each message flowed channel. messages received of type string. file nice-to-have file, meaning it's not necessary have writing file within same transaction of master flow. async wire-tap pattern implemented use-case. messages written file need in same order received in (so either 1 thread need process them or need aggregator wait multiple threads complete write them in original order).

i wanted feel performant way handle use-case tried few tests. make bit easier tests not using async wire-tap (but mentioned in use-case because perhaps suggestions might involve batching/buffering solutions).

the general flow came "define integration flow" section of link: https://spring.io/guides/gs/integration/

the main options tried were:

  1. use int-file:outbound-channel-adapter (which creates filewritingmessagehandler) along transformer appends newline each message (transformer uses spel expression payload + '#{systemproperties['line.separator']}.
    spring.expression.compiler.mode=off
  2. use int-file:outbound-channel-adapter (which creates filewritingmessagehandler) along transformer appends newline each message (transformer uses spel expression payload.tostring() + '#{systemproperties['line.separator']}.
    spring.expression.compiler.mode=mixed
    note: using payload.tostring() rather payload around spel issue: https://jira.spring.io/browse/spr-12514
  3. use int:logging-channel-adapter rather int-file:outbound-channel-adapter (saving on having use transformer spel expression).
    tested log4j2 using rollingrandomaccessfile , synchronous loggers.
    spring.expression.compiler.mode=off
  4. use int:logging-channel-adapter rather int-file:outbound-channel-adapter (saving on having use transformer spel expression).
    tested log4j2 using rollingrandomaccessfile , asynchronous loggers. see http://logging.apache.org/log4j/2.0/manual/async.html#making loggers asynchronous .
    spring.expression.compiler.mode=off
  5. use int:logging-channel-adapter rather int-file:outbound-channel-adapter (saving on having use transformer spel expression).
    tested log4j2 using rollingrandomaccessfile , asynchronous loggers. see http://logging.apache.org/log4j/2.0/manual/async.html#making loggers asynchronous .
    spring.expression.compiler.mode=mixed

test cases 1 , 2 flow: int-file:outbound-channel-adapter flow

test cases 3 thru 5 flow: int:logging-channel-adapter flow

the input file contains xml data (strings) vary in length between 1200 , 1500 chars per line (each line single message).
in test have 203,712 messages.
below timings. i'm showing timings first items bit more last items since spel compiler kicks in after period of time.

|          1              |            2             |              3                 |               4                |              5                 | |springint fileadapter    | springint fileadapter    | log4j2 rollingrandomaccessfile | log4j2 rollingrandomaccessfile | log4j2 rollingrandomaccessfile | |                         |                          | sync loggers                   | async loggers                  | async                     | |spel-compiler=off        | spel-compiler=mixed      | spel-compiler=off              | spel-compiler=off              | spel-compiler=mixed            | |-------------------------|--------------------------|--------------------------------|--------------------------------|------------------------------- | |cnt=10000 : 0:00:12.670  | cnt=10000 : 0:00:17.235  | cnt=10000 : 0:00:08.222        | cnt=10000 : 0:00:01.847        | cnt=10000 : 0:00:01.320        | |cnt=20000 : 0:00:24.636  | cnt=20000 : 0:00:30.208  | cnt=20000 : 0:00:08.828        | cnt=20000 : 0:00:02.232        | cnt=20000 : 0:00:01.839        | |cnt=30000 : 0:00:36.179  | cnt=30000 : 0:00:44.300  | cnt=30000 : 0:00:09.426        | cnt=30000 : 0:00:02.512        | cnt=30000 : 0:00:02.647        | |...                      | ....                     | ...                            | ...                            | ...                            | |cnt=180000 : 0:02:58.935 | cnt=180000 : 0:04:15.528 | cnt=180000 : 0:00:17.095       | cnt=180000 : 0:00:08.546       | cnt=180000 : 0:00:07.936       | |cnt=200000 : 0:03:16.473 | cnt=200000 : 0:04:35.582 | cnt=200000 : 0:00:18.107       | cnt=200000 : 0:00:09.548       | cnt=200000 : 0:00:08.660       | |cnt=203712 : 0:03:19.715 | cnt=203712 : 0:04:39.452 | cnt=203712 : 0:00:18.284       | cnt=203712 : 0:00:09.661       | cnt=203712 : 0:00:08.732       | 

take timings grain of salt - didn't run these dozens of times , take averages. i'm not advocating log4j2 faster other offerrings logback, i'm merely using comparision purposes. note: i'm using file input test. point out because might suggest have spring integration copy raw file filea fileb. in our real use-case messages come in via jms file-to-file solution not real option.
interesting points:

  • spring integration filewritingmessagehandler slower log4j2 offering.
    log4j2-async took 4.3% of time filewritingmessagehandler did (199.715 secs scenario1 vs 8.732 sec scenario5).
    log4j2-sync took 4.8% of time filewritingmessagehandler did (199.715 secs scenario1 vs 9.661 sec scenario4).
  • spring integration filewritingmessagehandler spring.expression.compiler.mode=mixed (scenario #2) slower spring.expression.compiler.mode=off. assume it's because in scenario #1 able use payload + '#{systemproperties['line.separator']} whereas in scenario #2 had use payload.tostring() + '#{systemproperties['line.separator']}
  • scenarios 3 thru 5 expected, relative other scenarios.

ideally don't having use logging-channel-adapter write messages file - seems i'm bastardizing component. performance gain signficant unfortunately of can't rule out using that.
questions are:

  • what other options have besides writing own filewritingmessagehandler better file-writing performance?
  • i assume perhaps if batch or aggregate prior filewritingmessagehandler write out batched group performance might better. i'm sure use task-executor , poller (my use-case permit this). if batching should considered option should filewritingmessagehandler expose buffersize attribute?
  • could filewritingmessagehandler tweaked, or perhaps additional more-specific versions offerred more performant use-case (perhaps taking advice/hints log4j2 loggers)?
  • would file streamwriter more performant?
  • just thinking out-loud on one: should log4j2 "wrapper" class acts purely file-adapter offerred (i.e. have log message no line/class/etc info, have write regardless of level, user merely pass in filename , perhaps sync/async)?
  • can spel compiler better optimized handle payload.tostring() + '#{systemproperties['line.separator']} case, since noted slower not calling tostring() in spel itself?

below code/config files used tests.

pom.xml

    <dependencies>          <!-- testing -->          <dependency>             <groupid>junit</groupid>             <artifactid>junit</artifactid>             <version>${junit.version}</version>             <scope>test</scope>         </dependency>          <!-- spring integration -->          <dependency>             <groupid>org.springframework.integration</groupid>             <artifactid>spring-integration-core</artifactid>             <version>${spring.integration.version}</version>         </dependency>          <dependency>             <groupid>org.springframework.integration</groupid>             <artifactid>spring-integration-file</artifactid>             <version>${spring.integration.version}</version>         </dependency>          <!-- logging -->         <dependency>             <groupid>org.slf4j</groupid>             <artifactid>slf4j-api</artifactid>             <version>1.7.7</version>         </dependency>         <!-- binding jcl (aka java common logging).  -->         <!-- needed since things commons libs use commons-logging don't want -->         <dependency>             <groupid>org.slf4j</groupid>             <artifactid>jcl-over-slf4j</artifactid>             <version>1.7.7</version>             <!-- making scope runtime we'll catch of our own classes try use commons-logging when compile -->             <scope>runtime</scope>         </dependency>         <!-- binding log4j -->         <dependency>             <groupid>org.apache.logging.log4j</groupid>             <artifactid>log4j-slf4j-impl</artifactid>             <!-- of 9/12/2014 our company maven repos not have 2.0.2 -->             <version>2.0.1</version>         </dependency>         <!-- log4j api , core implementation required binding -->         <dependency>             <groupid>org.apache.logging.log4j</groupid>             <artifactid>log4j-api</artifactid>             <version>2.0.2</version>         </dependency>         <dependency>             <groupid>org.apache.logging.log4j</groupid>             <artifactid>log4j-core</artifactid>             <version>2.0.2</version>         </dependency>         <!-- async loggers log4j2 require lmax disruptor, see http://logging.apache.org/log4j/2.x/manual/async.html -->         <dependency>             <groupid>com.lmax</groupid>             <artifactid>disruptor</artifactid>             <version>3.2.1</version>         </dependency>          <dependency>             <groupid>org.apache.commons</groupid>             <artifactid>commons-lang3</artifactid>             <version>3.1</version>         </dependency>     </dependencies> 

java classes

package com.xxx;  import java.util.scanner;  import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.context.support.abstractapplicationcontext; import org.springframework.context.support.classpathxmlapplicationcontext;  /**  * starts spring context , initialize spring integration routes.  */ public final class main {      private static final logger logger = loggerfactory.getlogger(main.class);      private main() {     }      /**      * load spring integration application context      *      * @param args - command line arguments      */     public static void main(final string... args) {          if (logger.isinfoenabled()) {             logger.info("\n=========================================================" + "\n                                                         " + "\n          welcome spring integration!                 " + "\n                                                         " + "\n    more information please visit:                   " + "\n    http://www.springsource.org/spring-integration       " + "\n                                                         " + "\n=========================================================");         }          final abstractapplicationcontext context = new classpathxmlapplicationcontext("classpath:meta-inf/spring/integration/spring-integration-context-usecases.xml");          context.registershutdownhook();          springintegrationutils.displaydirectories(context);          final scanner scanner = new scanner(system.in);          if (logger.isinfoenabled()) {             logger.info("\n=========================================================" + "\n                                                         " + "\n    please press 'q + enter' quit application.    " + "\n                                                         " + "\n=========================================================");         }          while (!scanner.hasnext("q")) {             //do nothing unless user presses 'q' quit.         }          if (logger.isinfoenabled()) {             logger.info("exiting application...bye.");         }          system.exit(0);      } }    package com.xxx;  import java.io.file; import java.io.ioexception; import java.util.collections; import java.util.iterator; import java.util.list; import java.util.map; import java.util.concurrent.atomic.atomicinteger;  import org.apache.commons.lang3.time.stopwatch; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.integration.integrationmessageheaderaccessor; import org.springframework.integration.routingslip.routingsliproutestrategy; import org.springframework.integration.splitter.abstractmessagesplitter; import org.springframework.integration.support.abstractintegrationmessagebuilder; import org.springframework.integration.transformer.messagetransformationexception; import org.springframework.messaging.message; import org.springframework.messaging.messagechannel; import org.springframework.messaging.messageheaders; import org.springframework.messaging.messagingexception; import org.springframework.messaging.core.destinationresolutionexception; import org.springframework.util.assert; import org.springframework.util.stringutils;  /**  * class needed until bug fixed in spring integration 4.1.0.  * see {@link http://stackoverflow.com/questions/27171978/read-csv-file-concurrently-using-spring-integration}  * once fixed delete class , use in spring context file.  * <code>  * <splitter input-channel="splitchannel" output-channel="executorchannel" expression="t(org.apache.commons.io.fileutils).lineiterator(payload)"/>  * </code>  *  */ public class filesplitter extends abstractmessagesplitter {     private static final logger log = loggerfactory.getlogger(filesplitter.class);      int counter = 0;     stopwatch sw = new stopwatch();      public object splitmessage(message<?> message) {         if (log.isdebugenabled()) {             log.debug(message.tostring());         }         try {              object payload = message.getpayload();             assert.isinstanceof(file.class, payload, "expected java.io.file in message payload");              return org.apache.commons.io.fileutils.lineiterator((file) payload);         } catch (ioexception e) {             string msg = "unable transform file: " + e.getmessage();             log.error(msg);             throw new messagetransformationexception(msg, e);         }     }      @override     protected void produceoutput(object result, message<?> requestmessage) {          iterator<?> iterator = (iterator<?>) result;         sw.start();         while (iterator.hasnext()) {             ++counter;             produceoutputinternal(iterator.next(), requestmessage);             if (counter % 10000 == 0) {                 sw.split();                 system.out.println("cnt=" + counter + " : " + sw.tosplitstring());             }         }         sw.stop();         system.out.println("completed");         system.out.println("cnt=" + counter + " : " + sw.tosplitstring());     }      private object getoutputchannelfromroutingslip(object reply, message<?> requestmessage, list<?> routingslip, atomicinteger routingslipindex) {         if (routingslipindex.get() >= routingslip.size()) {             return null;         }          object path = routingslip.get(routingslipindex.get());         object routingslippathvalue = null;          if (path instanceof string) {             routingslippathvalue = getbeanfactory().getbean((string) path);         } else if (path instanceof routingsliproutestrategy) {             routingslippathvalue = path;         } else {             throw new illegalargumentexception("the routingslip 'path' can of " + "string or routingsliproutestrategy type, gotten: " + path);         }          if (routingslippathvalue instanceof messagechannel) {             routingslipindex.incrementandget();             return routingslippathvalue;         } else {             object nextpath = ((routingsliproutestrategy) routingslippathvalue).getnextpath(requestmessage, reply);             if (nextpath != null && (!(nextpath instanceof string) || stringutils.hastext((string) nextpath))) {                 return nextpath;             } else {                 routingslipindex.incrementandget();                 return getoutputchannelfromroutingslip(reply, requestmessage, routingslip, routingslipindex);             }         }     }      protected void produceoutputinternal(object reply, message<?> requestmessage) {         messageheaders requestheaders = requestmessage.getheaders();          object replychannel = null;         if (getoutputchannel() == null) {             map<?, ?> routingslipheader = requestheaders.get(integrationmessageheaderaccessor.routing_slip, map.class);             if (routingslipheader != null) {                 assert.istrue(routingslipheader.size() == 1, "the routingslip header value must singletonmap");                 object key = routingslipheader.keyset().iterator().next();                 object value = routingslipheader.values().iterator().next();                 assert.isinstanceof(list.class, key, "the routingslip key must list");                 assert.isinstanceof(integer.class, value, "the routingslip value must integer");                 list<?> routingslip = (list<?>) key;                 atomicinteger routingslipindex = new atomicinteger((integer) value);                 replychannel = getoutputchannelfromroutingslip(reply, requestmessage, routingslip, routingslipindex);                 if (replychannel != null) {                     //todo migrate sf messagebuilder                     abstractintegrationmessagebuilder<?> builder = null;                     if (reply instanceof message) {                         builder = this.getmessagebuilderfactory().frommessage((message<?>) reply);                     } else if (reply instanceof abstractintegrationmessagebuilder) {                         builder = (abstractintegrationmessagebuilder<?>) reply;                     } else {                         builder = this.getmessagebuilderfactory().withpayload(reply);                     }                     builder.setheader(integrationmessageheaderaccessor.routing_slip, collections.singletonmap(routingslip, routingslipindex.get()));                     reply = builder;                 }             }              if (replychannel == null) {                 replychannel = requestheaders.getreplychannel();             }         }          message<?> replymessage = createoutputmessage(reply, requestheaders);         sendoutput(replymessage, replychannel);     }      private message<?> createoutputmessage(object output, messageheaders requestheaders) {         abstractintegrationmessagebuilder<?> builder = null;         if (output instanceof message<?>) {             if (!this.shouldcopyrequestheaders()) {                 return (message<?>) output;             }             builder = this.getmessagebuilderfactory().frommessage((message<?>) output);         } else if (output instanceof abstractintegrationmessagebuilder) {             builder = (abstractintegrationmessagebuilder<?>) output;         } else {             builder = this.getmessagebuilderfactory().withpayload(output);         }         if (this.shouldcopyrequestheaders()) {             builder.copyheadersifabsent(requestheaders);         }         return builder.build();     }      private void sendoutput(object output, object replychannel) {         messagechannel outputchannel = getoutputchannel();         if (outputchannel != null) {             replychannel = outputchannel;         }         if (replychannel == null) {             throw new destinationresolutionexception("no output-channel or replychannel header available");         }          if (replychannel instanceof messagechannel) {             if (output instanceof message<?>) {                 this.messagingtemplate.send((messagechannel) replychannel, (message<?>) output);             } else {                 this.messagingtemplate.convertandsend((messagechannel) replychannel, output);             }         } else if (replychannel instanceof string) {             if (output instanceof message<?>) {                 this.messagingtemplate.send((string) replychannel, (message<?>) output);             } else {                 this.messagingtemplate.convertandsend((string) replychannel, output);             }         } else {             throw new messagingexception("replychannel must messagechannel or string");         }     } }   package com.xxx;  import java.io.file; import java.util.arraylist; import java.util.list; import java.util.map;  import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.directfieldaccessor; import org.springframework.context.applicationcontext; import org.springframework.expression.expression; import org.springframework.integration.file.filereadingmessagesource; import org.springframework.integration.file.filewritingmessagehandler;  /**  * displays names of input , output directories.  */ public final class springintegrationutils {      private static final log logger = logfactory.getlog(springintegrationutils.class);      private springintegrationutils() { }      /**      * helper method dynamically determine , display input , output      * directories defined in spring integration context.      *      * @param context spring application context      */     public static void displaydirectories(final applicationcontext context) {          final file indir = (file) new directfieldaccessor(context.getbean(filereadingmessagesource.class)).getpropertyvalue("directory");          final map<string, filewritingmessagehandler> filewritingmessagehandlers = context.getbeansoftype(filewritingmessagehandler.class);          final list<string> outputdirectories = new arraylist<string>();          (final filewritingmessagehandler messagehandler : filewritingmessagehandlers.values()) {             final expression outdir = (expression) new directfieldaccessor(messagehandler).getpropertyvalue("destinationdirectoryexpression");             outputdirectories.add(outdir.getexpressionstring());         }          final stringbuilder stringbuilder = new stringbuilder();          stringbuilder.append("\n=========================================================");         stringbuilder.append("\n");         stringbuilder.append("\n    input directory : '" + indir.getabsolutepath() + "'");          (final string outputdirectory : outputdirectories) {             stringbuilder.append("\n    output directory is: '" + outputdirectory + "'");         }          stringbuilder.append("\n\n=========================================================");          logger.info(stringbuilder.tostring());      }  } 

log4j2.xml config file

<?xml version="1.0" encoding="utf-8"?>  <configuration>     <appenders>         <console name="stdout" target="system_out">             <patternlayout pattern="%d{iso8601} [%t] [%-5p] (%c) - %m%n" />         </console>          <rollingrandomaccessfile name="fileappendermessages" filename="c:/users/xxxxx/desktop/fileadapter-test/usecase3.txt">             <patternlayout pattern="%m %n" />         </rollingrandomaccessfile>     </appenders>      <loggers>         <!-- wire-tap , logging-channel-adapter in spring cfg file use category name -->         <logger name="filelogger" additivity="false">             <appenderref ref="fileappendermessages" />         </logger>          <root level="info">             <appenderref ref="stdout" />         </root>     </loggers> </configuration> 

spring-integration-context-usecases.xml file

<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"      xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"      xmlns:context="http://www.springframework.org/schema/context"      xmlns:int="http://www.springframework.org/schema/integration"      xmlns:int-file="http://www.springframework.org/schema/integration/file"     xmlns:int-stream="http://www.springframework.org/schema/integration/stream"     xmlns:batch="http://www.springframework.org/schema/batch"      xmlns:task="http://www.springframework.org/schema/task"     xsi:schemalocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd         http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd         http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd         http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd         http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">      <int:inbound-channel-adapter id="fileadapter" ref="filereadingmessagesource" method="receive" auto-startup="true" channel="files" >         <int:poller fixed-delay="#{t(java.lang.integer).max_value}"/>     </int:inbound-channel-adapter>      <bean id="filereadingmessagesource" class="org.springframework.integration.file.filereadingmessagesource">         <property name="directory" value="c:/users/xxxxx/desktop/tmg-exchange-gateway-nam/t2"/>     </bean>      <int:channel id="files"/>      <int:splitter input-channel="files" output-channel="stringmessages">         <bean class="com.xxx.filesplitter" />     </int:splitter>      <int:channel id="stringmessages"/>      <int:transformer expression="payload + '#{systemproperties['line.separator']}'" output-channel="file" auto-startup="true" input-channel="stringmessages"/>     <int-file:outbound-channel-adapter id="file"             mode="append"             charset="utf-8"             directory="c:/users/xxxxx/desktop/fileadapter-test"             auto-create-directory="true"             filename-generator-expression="'usecase2.txt'"/> </beans> 

tests can run using these settings:

1. java -dspring.expression.compiler.mode=off com.xxx.main leave context file unchanged. 2. java -dspring.expression.compiler.mode=mixed com.xxx.main change context file have expression="payload.tostring() + '#{systemproperties['line.separator']}'" 3. java -dspring.expression.compiler.mode=off com.xxx.main comment out transformer , outbound-channel-adapter. change logging-channel-adapter   auto-startup="true" 4. java -dlog4jcontextselector=org.apache.logging.log4j.core.async.asyncloggercontextselector  -dspring.expression.compiler.mode=off com.xxx.main comment out transformer , outbound-channel-adapter. change logging-channel-adapter   auto-startup="true" 5. java -dlog4jcontextselector=org.apache.logging.log4j.core.async.asyncloggercontextselector  -dspring.expression.compiler.mode=mixed com.xxx.main comment out transformer , outbound-channel-adapter. change logging-channel-adapter   auto-startup="true" 

thanks extensive analysis.

to honest, append mode relatively recent addition outbound adapter , has not been optimized.

i suspect cost because stream closed on each write (using filecopy.copy()) flushes disk.

we should consider option keep bufferedoutputstream open. it's little tricky because adapter supports writing different file each message. presume use case write same file, or timestamp-based file name. provide optimization keep file open until request different file comes in, or keep several file buffers open.

at some, point, though, want flush buffer(s) if no new messages arrive after time elapses. adds complexity (but not lot).

of course, downside there risk of data loss in event of power failure when have data buffered in memory. classic tradeoff - performance vs. reliability; right adapter errs towards latter.

as always, feel free open jira issue , we'll take look.


Comments

Popular posts from this blog

python - mat is not a numerical tuple : openCV error -

c# - MSAA finds controls UI Automation doesn't -

wordpress - .htaccess: RewriteRule: bad flag delimiters -