New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

io.github.nassimus26:FlowControl

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

io.github.nassimus26:FlowControl

Powerful Thread Pool Executor with Flow Control and BatchBuffer execution

  • 1.0.9
  • Source
  • Maven
  • Socket score

Version published
Maintainers
1
Source

FlowControl

Tiny but powerful Thread Pool Executor with Flow Control and BatchBuffer execution

Features :

  • Threads Exceptions handling

  • Fixed Queue Size for BackPresure control

  • Fixed BufferedBatch processing :

    • Will submit the callable when the buffer is ready (full), extremely useful to reduce Threads context switching

Main classes :

  • FlowControlExecutor

  • BufferedBatchFlowControlExecutor

Maven Dependency

<dependency>
    <groupId>io.github.nassimus26</groupId>
    <artifactId>FlowControl</artifactId>
    <version>1.0.9</version> 
</dependency>

Example :

@Test
public void testFlowControl() throws Throwable {
    int nbrRows = 2_000_000;
    final List<String> rows = new ArrayList<>();
    for (int i=0;i<nbrRows;i++)
        rows.add(generateRow(i));
    final List<String> expectedValues = new ArrayList<>();
    System.out.println("Starting 1 Thread processing...");
    long now = System.currentTimeMillis();
    for (String row : rows)
        expectedValues.add(transformRow(row));
    double processDurationWithOneThread = ((System.currentTimeMillis()-now)/1000.0);
    System.out.println("1 Thread processing takes "+ processDurationWithOneThread +" seconds");
    final List<String> result = new Vector<>();
    final AtomicBoolean isProcessingEnds = new AtomicBoolean();
    BufferedBatchFlowControlExecutor<String> processRows =
            new BufferedBatchFlowControlExecutor<String>(
                    new BufferedBatchCallable<String>() {
                        @Override
                        public void call(List<String> batchValues) {
                            ArrayList<String> tmp = new ArrayList<>();
                            for ( int i=0; i<batchValues.size(); i++ )
                                tmp.add(transformRow(batchValues.get(i)));
                            result.addAll(tmp);
                        }
                    }, 1000, BufferedBatchFlowControlExecutor.getNbCores(), 50, "processRows") {

                @Override
                public void handleException(Exception e) {
                    /* The executor will throw the exception at the end if any exception */
                }

                @Override
                public boolean isSubmitsEnds() {
                    return isProcessingEnds.get();
                }

            };

    System.out.println("Starting Parallel processing...");
    processRows.printLog(0, 100);
    now = System.currentTimeMillis();
    for (String row:rows){
        try {
            processRows.submit(row);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    isProcessingEnds.set(true);
    processRows.waitAndFlushWithException(true);
    double parallelDuration = ((System.currentTimeMillis()-now)/1000.0);
    System.out.println("Parallel processing takes "+ parallelDuration +
            " seconds ("+ (int)(processDurationWithOneThread*100/parallelDuration)+"% faster)");
    Collections.sort(expectedValues);
    Collections.sort(result);
    Assert.assertArrayEquals( expectedValues.toArray(), result.toArray() );
}

private String generateRow(int i){
    return "row_"+i;
}

private String transformRow(String row){// some CPU operations
    return row.replaceAll("row", "replaceMe").replaceAll("_", "!")
            .replaceAll("replaceMe", "ligne").replaceAll("!", "_");
}

Expected output

1 Thread processing takes 4.686 seconds
Starting Parallel processing...
Thread processRows : Actives : 0 , Queue : 0   , Speed/Sec : Current=0           -  Avg=0              , Done : 0             , 0.003    Mem (Mb): Aval=1 017 ,  Free=1562  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 26  , Speed/Sec : Current=653 000     -  Avg=643 000        , Done : 132 000       , 0.205    Mem (Mb): Aval=304   ,  Free=2275  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 21  , Speed/Sec : Current=2 165 000   -  Avg=1 190 000      , Done : 381 000       , 0.32     Mem (Mb): Aval=1 277 ,  Free=1302  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 18  , Speed/Sec : Current=693 000     -  Avg=1 034 000      , Done : 483 000       , 0.467    Mem (Mb): Aval=327   ,  Free=2252  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 21  , Speed/Sec : Current=2 078 000   -  Avg=1 221 000      , Done : 695 000       , 0.569    Mem (Mb): Aval=1 166 ,  Free=1413  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 3   , Speed/Sec : Current=952 000     -  Avg=1 160 000      , Done : 854 000       , 0.736    Mem (Mb): Aval=364   ,  Free=2215  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 23  , Speed/Sec : Current=2 363 000   -  Avg=1 316 000      , Done : 1 114 000     , 0.846    Mem (Mb): Aval=1 408 ,  Free=1171  ,  Total=2580  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 20  , Speed/Sec : Current=575 000     -  Avg=1 191 000      , Done : 1 213 000     , 1.018    Mem (Mb): Aval=380   ,  Free=2547  ,  Total=2928  ,  Max=4062   
Thread processRows : Actives : 6 , Queue : 0   , Speed/Sec : Current=953 000     -  Avg=1 168 000      , Done : 1 315 000     , 1.125    Mem (Mb): Aval=798   ,  Free=2130  ,  Total=2928  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 19  , Speed/Sec : Current=2 355 000   -  Avg=1 269 000      , Done : 1 560 000     , 1.229    Mem (Mb): Aval=1 784 ,  Free=1144  ,  Total=2928  ,  Max=4062   
Thread processRows : Actives : 6 , Queue : 0   , Speed/Sec : Current=437 000     -  Avg=1 185 000      , Done : 1 620 000     , 1.366    Mem (Mb): Aval=401   ,  Free=2526  ,  Total=2928  ,  Max=4062   
Thread processRows : Actives : 8 , Queue : 20  , Speed/Sec : Current=2 152 000   -  Avg=1 254 000      , Done : 1 846 000     , 1.471    Mem (Mb): Aval=1 320 ,  Free=1608  ,  Total=2928  ,  Max=4062   
Thread processRows : Actives : 0 , Queue : 0   , Speed/Sec : Current=2 406 000   -  Avg=1 302 000      , Done : 2 000 000     , 1.535    Mem (Mb): Aval=1 923 ,  Free=1004  ,  Total=2928  ,  Max=4062   
Parallel processing takes 1.534 seconds (305% faster)

FAQs

Package last updated on 03 Jan 2019

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc