Skip to content

Commit 3a380e2

Browse files
author
sa-buc
committed
add new interfaces to support peer node and abort
1 parent 774a4af commit 3a380e2

File tree

3 files changed

+454
-4
lines changed

3 files changed

+454
-4
lines changed

example/simple-table-demo/src/main/java/com/oceanbase/example/ObDirectLoadDemo.java

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
2222
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadManager;
2323
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
24+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadTraceId;
2425
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
2526
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutionId;
27+
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
2628
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
2729
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2830
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
@@ -53,6 +55,10 @@ public static void main(String[] args) {
5355
SimpleTest.run();
5456
ParallelWriteTest.run();
5557
MultiNodeWriteTest.run();
58+
MultiPeerNodeWriteTest.run();
59+
SendHeartBeatTest.run();
60+
AbortBeforeCommitTest.run();
61+
AbortAfterCommitAsyncTest.run();
5662
}
5763

5864
private static void prepareTestTable() throws Exception {
@@ -309,4 +315,303 @@ public static void run() {
309315

310316
};
311317

318+
private static class MultiPeerNodeWriteTest {
319+
320+
private static class MultiPeerNodeWriter implements Runnable {
321+
322+
private final byte[] executionIdBytes;
323+
private final int id;
324+
private final long uniqueId;
325+
private final long sequence;
326+
boolean isPrimary;
327+
Thread[] threads;
328+
329+
MultiPeerNodeWriter(byte[] executionIdBytes, long uniqueId, long sequence, int id) {
330+
this.executionIdBytes = executionIdBytes;
331+
this.id = id;
332+
this.uniqueId = uniqueId;
333+
this.sequence = sequence;
334+
}
335+
336+
MultiPeerNodeWriter(byte[] executionIdBytes, long uniqueId, long sequence, int id, Thread[] threads) {
337+
this.executionIdBytes = executionIdBytes;
338+
this.id = id;
339+
this.uniqueId = uniqueId;
340+
this.sequence = sequence;
341+
this.isPrimary = true;
342+
this.threads = threads;
343+
}
344+
345+
@Override
346+
public void run() {
347+
ObDirectLoadConnection connection = null;
348+
ObDirectLoadStatement statement = null;
349+
try {
350+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
351+
ObDirectLoadTraceId trace_id = new ObDirectLoadTraceId(uniqueId, sequence);
352+
executionId.decode(executionIdBytes);
353+
354+
connection = buildConnection(1);
355+
statement = buildStatement(connection);
356+
357+
statement.resumePeer(executionId, trace_id);
358+
359+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
360+
ObObj[] rowObjs = new ObObj[2];
361+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
362+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
363+
bucket.addRow(rowObjs);
364+
statement.write(bucket);
365+
366+
if (isPrimary) {
367+
for (int i = 0; i < threads.length; ++i) {
368+
threads[i].join();
369+
}
370+
statement.commit();
371+
}
372+
373+
} catch (Exception e) {
374+
throw new RuntimeException(e);
375+
} finally {
376+
if (null != statement) {
377+
statement.close();
378+
}
379+
if (null != connection) {
380+
connection.close();
381+
}
382+
}
383+
}
384+
385+
};
386+
387+
public static void run() {
388+
System.out.println("MultiPeerNodeWriteTest start");
389+
final int writeThreadNum = 10;
390+
ObDirectLoadConnection connection = null;
391+
ObDirectLoadStatement statement = null;
392+
try {
393+
prepareTestTable();
394+
395+
connection = buildConnection(1);
396+
statement = buildStatement(connection);
397+
398+
statement.begin();
399+
400+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
401+
byte[] executionIdBytes = executionId.encode();
402+
403+
Thread[] threads = new Thread[writeThreadNum];
404+
for (int i = 0; i < threads.length; ++i) {
405+
MultiPeerNodeWriter multiPeerNodeWriter = new MultiPeerNodeWriter(executionIdBytes, i, i, i);
406+
Thread thread = new Thread(multiPeerNodeWriter);
407+
threads[i] = thread;
408+
}
409+
MultiPeerNodeWriter multiPeerNodeWriter = new MultiPeerNodeWriter(executionIdBytes, writeThreadNum, writeThreadNum, writeThreadNum, threads);
410+
Thread primaryThread = new Thread(multiPeerNodeWriter);
411+
primaryThread.start();
412+
for (int i = 0; i < threads.length; ++i) {
413+
threads[i].start();
414+
}
415+
primaryThread.join();
416+
queryTestTable(writeThreadNum + 1);
417+
418+
} catch (Exception e) {
419+
throw new RuntimeException(e);
420+
} finally {
421+
if (null != statement) {
422+
statement.close();
423+
}
424+
if (null != connection) {
425+
connection.close();
426+
}
427+
}
428+
System.out.println("MultiPeerNodeWriteTest successful");
429+
}
430+
431+
};
432+
433+
private static class SendHeartBeatTest {
434+
435+
private static class SendHeartBeat implements Runnable {
436+
437+
private final ObDirectLoadConnection connection;
438+
private final ObDirectLoadStatement statement;
439+
public boolean isCancel;
440+
441+
SendHeartBeat(ObDirectLoadConnection connection, ObDirectLoadStatement statement) {
442+
this.connection = connection;
443+
this.statement = statement;
444+
isCancel = false;
445+
}
446+
447+
public void stopHeartBeat() {
448+
isCancel = true;
449+
}
450+
451+
@Override
452+
public void run() {
453+
System.out.println("prepare run heartbeat");
454+
try {
455+
while (true) {
456+
synchronized (this) {
457+
if (isCancel) {
458+
break;
459+
} else {
460+
ObTableLoadClientStatus status = statement.sendHeartBeat();
461+
switch (status) {
462+
case INITIALIZING:
463+
case WAITTING:
464+
case RUNNING:
465+
case COMMITTING:
466+
case COMMIT:
467+
Thread.sleep(10); //10ms
468+
break;
469+
default:
470+
System.out.println("statement server status is unexpected " + status);
471+
throw new RuntimeException("status" + status);
472+
}
473+
}
474+
}
475+
}
476+
} catch (Exception e) {
477+
throw new RuntimeException(e);
478+
} finally {
479+
if (null != statement) {
480+
statement.close();
481+
}
482+
if (null != connection) {
483+
connection.close();
484+
}
485+
}
486+
}
487+
488+
};
489+
490+
public static void run() {
491+
System.out.println("SendHeartBeatTest start");
492+
ObDirectLoadConnection connection = null;
493+
ObDirectLoadStatement statement = null;
494+
try {
495+
prepareTestTable();
496+
System.out.println("prepareTestTable");
497+
498+
connection = buildConnection(1);
499+
statement = buildStatement(connection);
500+
501+
SendHeartBeat sendHeartBeat = new SendHeartBeat(connection, statement);
502+
Thread primaryThread = new Thread(sendHeartBeat);
503+
504+
statement.beginPeer();
505+
primaryThread.start();
506+
507+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
508+
ObObj[] rowObjs = new ObObj[2];
509+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
510+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
511+
bucket.addRow(rowObjs);
512+
statement.write(bucket);
513+
514+
statement.commit();
515+
sendHeartBeat.stopHeartBeat();
516+
517+
primaryThread.join();
518+
519+
queryTestTable(1);
520+
} catch (Exception e) {
521+
throw new RuntimeException(e);
522+
} finally {
523+
if (null != statement) {
524+
statement.close();
525+
}
526+
if (null != connection) {
527+
connection.close();
528+
}
529+
}
530+
System.out.println("SendHeartBeatTest successful");
531+
}
532+
533+
};
534+
535+
private static class AbortBeforeCommitTest {
536+
537+
public static void run() {
538+
System.out.println("AbortBeforeCommitTest start");
539+
ObDirectLoadConnection connection = null;
540+
ObDirectLoadStatement statement = null;
541+
try {
542+
prepareTestTable();
543+
System.out.println("prepareTestTable");
544+
545+
connection = buildConnection(1);
546+
statement = buildStatement(connection);
547+
548+
statement.begin();
549+
550+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
551+
ObObj[] rowObjs = new ObObj[2];
552+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
553+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
554+
bucket.addRow(rowObjs);
555+
statement.write(bucket);
556+
557+
statement.abort();
558+
559+
queryTestTable(0);
560+
} catch (Exception e) {
561+
throw new RuntimeException(e);
562+
} finally {
563+
if (null != statement) {
564+
statement.close();
565+
}
566+
if (null != connection) {
567+
connection.close();
568+
}
569+
}
570+
System.out.println("AbortBeforeCommitTest successful");
571+
}
572+
573+
};
574+
575+
private static class AbortAfterCommitAsyncTest {
576+
577+
public static void run() {
578+
System.out.println("AbortAfterCommitAsyncTest start");
579+
ObDirectLoadConnection connection = null;
580+
ObDirectLoadStatement statement = null;
581+
try {
582+
prepareTestTable();
583+
System.out.println("prepareTestTable");
584+
585+
connection = buildConnection(1);
586+
statement = buildStatement(connection);
587+
588+
statement.begin();
589+
590+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
591+
ObObj[] rowObjs = new ObObj[2];
592+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
593+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
594+
bucket.addRow(rowObjs);
595+
statement.write(bucket);
596+
597+
statement.commitAsync();
598+
599+
statement.abort();
600+
601+
queryTestTable(0);
602+
} catch (Exception e) {
603+
throw new RuntimeException(e);
604+
} finally {
605+
if (null != statement) {
606+
statement.close();
607+
}
608+
if (null != connection) {
609+
connection.close();
610+
}
611+
}
612+
System.out.println("AbortAfterCommitAsyncTest successful");
613+
}
614+
615+
};
616+
312617
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutor;
2929
import com.alipay.oceanbase.rpc.direct_load.future.ObDirectLoadStatementFailedFuture;
3030
import com.alipay.oceanbase.rpc.direct_load.future.ObDirectLoadStatementFuture;
31+
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
3132
import com.alipay.oceanbase.rpc.direct_load.util.ObDirectLoadUtil;
3233
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
3334

@@ -251,6 +252,24 @@ public void begin() throws ObDirectLoadException {
251252
}
252253
}
253254

255+
public ObDirectLoadStatementFuture beginPeerAsync() {
256+
try {
257+
checkStatus();
258+
return executor.beginPeer();
259+
} catch (ObDirectLoadException e) {
260+
logger.warn("statement begin peer failed", e);
261+
return new ObDirectLoadStatementFailedFuture(this, e);
262+
}
263+
}
264+
265+
public void beginPeer() throws ObDirectLoadException {
266+
ObDirectLoadStatementFuture future = beginPeerAsync();
267+
future.await();
268+
if (!future.isSuccess()) {
269+
throw future.cause();
270+
}
271+
}
272+
254273
public ObDirectLoadStatementFuture commitAsync() {
255274
try {
256275
checkStatus();
@@ -304,6 +323,25 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
304323
executor.resume(executionId);
305324
}
306325

326+
public void resumePeer(ObDirectLoadStatementExecutionId executionId, ObDirectLoadTraceId newTraceId) throws ObDirectLoadException {
327+
if (executionId == null || !executionId.isValid()) {
328+
logger.warn("Param 'executionId' must not be null or invalid, value:" + executionId);
329+
throw new ObDirectLoadIllegalArgumentException(
330+
"Param 'executionId' must not be null or invalid, value:" + executionId);
331+
}
332+
checkStatus();
333+
executor.resumePeer(executionId, newTraceId);
334+
}
335+
336+
public ObTableLoadClientStatus sendHeartBeat() throws ObDirectLoadException {
337+
checkStatus();
338+
return executor.sendHeartBeat();
339+
}
340+
341+
public void abort() throws ObDirectLoadException {
342+
executor.execAbort();
343+
}
344+
307345
public static final class Builder {
308346

309347
private final ObDirectLoadConnection connection;

0 commit comments

Comments
 (0)