Skip to content

add new interfaces to P2P Mode #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

lalalalalafeier
Copy link

Summary

add new interfaces to support peer node and abort

Solution Description

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


sa-buc seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@@ -232,7 +257,11 @@ public void close() {
}
}
// 退出任务
abortIfNeed();
if (isPeer) {
abortIfNeedWhenPeerClosed();

This comment was marked as resolved.

executor.resumePeer(executionId, newTraceId);
}

public ObTableLoadClientStatus sendHeartBeat() throws ObDirectLoadException {

This comment was marked as resolved.

@@ -304,6 +323,25 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
executor.resume(executionId);
}

public void resumePeer(ObDirectLoadStatementExecutionId executionId, ObDirectLoadTraceId newTraceId) throws ObDirectLoadException {

This comment was marked as resolved.

This comment was marked as resolved.

@@ -309,4 +315,303 @@ public static void run() {

};

private static class MultiPeerNodeWriteTest {

This comment was marked as resolved.

This comment was marked as resolved.

connection = buildConnection(1);
statement = buildStatement(connection);

SendHeartBeat sendHeartBeat = new SendHeartBeat(connection, statement);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不应该在SendHeartBeat里面再build一个新的stmt去发心跳么?

@@ -251,6 +252,24 @@ public void begin() throws ObDirectLoadException {
}
}

public ObDirectLoadStatementFuture beginPeerAsync() {

This comment was marked as resolved.

@lalalalalafeier lalalalalafeier changed the title add new interfaces to support peer node and abort add new interfaces to P2P Mode Apr 30, 2025
@@ -279,6 +282,10 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
executor.write(bucket);
}

public void startHeartBeat() throws ObDirectLoadException {
executor.startHeartBeatInP2PMode();

This comment was marked as resolved.

@@ -377,6 +389,11 @@ public ObDirectLoadStatement build() throws ObDirectLoadException {
return connection.buildStatement(this);
}

public Builder setP2PMode() {

This comment was marked as resolved.

@@ -355,6 +419,10 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
}
}

public void execAbort() throws ObDirectLoadException {
abort();

This comment was marked as resolved.

@@ -382,6 +388,11 @@ public ObDirectLoadTraceId getTraceId() {
return traceId;
}

public Builder setNodeRole(NodeRole nodeRole) {

This comment was marked as resolved.

@@ -317,7 +336,7 @@ private synchronized void abortIfNeed() {
}
}

private ObDirectLoadStatementFuture abort() {
public ObDirectLoadStatementFuture abort() {

This comment was marked as resolved.

Thread thread = new Thread(NodeWriter);
threads[i] = thread;
}
P2PNodeWriter commitNodeWriter = new P2PNodeWriter(executionIdBytes, writeThreadNum, threads);

This comment was marked as resolved.

@@ -58,13 +58,19 @@ public class ObDirectLoadStatementExecutor {
private long taskId = 0;
private ObAddr svrAddr = null;
private ObDirectLoadException cause = null; // 失败原因
private NodeRole nodeRole = NodeRole.PRIMARY;

This comment was marked as resolved.

compareAndSetState(NONE, LOADING_ONLY, "resume");
if (NodeRole.P2P == nodeRole) {
compareAndSetState(NONE, LOADING, "resume in P2P mode");
startHeartBeat();

This comment was marked as resolved.

@lalalalalafeier lalalalalafeier force-pushed the master branch 3 times, most recently from e1d7a8e to 24c79d5 Compare May 27, 2025 09:06
@@ -327,6 +331,7 @@ public static final class Builder {
private ObDirectLoadStatementExecutionId executionId = null;

private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
private boolean isP2PMode = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么加到这个位置?

@@ -123,6 +141,16 @@ public synchronized ObDirectLoadStatementFuture begin() {
public synchronized ObDirectLoadStatementFuture commit() {
logger.info("statement call commit");
ObDirectLoadStatementAsyncPromiseTask task = null;
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥要套一层这个?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants