diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index af688ebfb62..a69f9fbabf9 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -20,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { type AsyncDisposable, configureResourceManagement } from '../resource_management'; import type { Server } from '../sdam/server'; -import { ClientSession, maybeClearPinnedConnection } from '../sessions'; +import { type ClientSession, maybeClearPinnedConnection } from '../sessions'; import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout'; import { addAbortListener, @@ -227,7 +227,7 @@ export abstract class AbstractCursor< /** @internal */ private cursorId: Long | null; /** @internal */ - private cursorSession: ClientSession; + private cursorSession: ClientSession | null; /** @internal */ private selectedServer?: Server; /** @internal */ @@ -352,11 +352,7 @@ export abstract class AbstractCursor< this.cursorOptions.maxAwaitTimeMS = options.maxAwaitTimeMS; } - if (options.session instanceof ClientSession) { - this.cursorSession = options.session; - } else { - this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false }); - } + this.cursorSession = options.session ?? null; this.deserializationOptions = { ...this.cursorOptions, @@ -413,7 +409,7 @@ export abstract class AbstractCursor< } /** @internal */ - get session(): ClientSession { + get session(): ClientSession | null { return this.cursorSession; } @@ -877,11 +873,12 @@ export abstract class AbstractCursor< this.trackCursor(); // We only want to end this session if we created it, and it hasn't ended yet - if (this.cursorSession.explicit === false) { + if (this.cursorSession?.explicit === false) { if (!this.cursorSession.hasEnded) { this.cursorSession.endSession().then(undefined, squashError); } - this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false }); + + this.cursorSession = null; } } @@ -907,6 +904,13 @@ export abstract class AbstractCursor< 'Unexpected null selectedServer. A cursor creating command should have set this' ); } + + if (this.cursorSession == null) { + throw new MongoRuntimeError( + 'Unexpected null session. A cursor creating command should have set this' + ); + } + const getMoreOptions = { ...this.cursorOptions, session: this.cursorSession, @@ -941,6 +945,7 @@ export abstract class AbstractCursor< ); } try { + this.cursorSession ??= this.cursorClient.startSession({ owner: this, explicit: false }); const state = await this._initialize(this.cursorSession); // Set omitMaxTimeMS to the value needed for subsequent getMore calls this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null; @@ -1032,41 +1037,57 @@ export abstract class AbstractCursor< return this.timeoutContext?.refreshed(); } }; - try { - if ( - !this.isKilled && - this.cursorId && - !this.cursorId.isZero() && - this.cursorNamespace && - this.selectedServer && - !this.cursorSession.hasEnded - ) { - this.isKilled = true; - const cursorId = this.cursorId; - this.cursorId = Long.ZERO; - - await executeOperation( - this.cursorClient, - new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, { - session: this.cursorSession - }), - timeoutContextForKillCursors() - ); + + const withEmitClose = async (fn: () => Promise) => { + try { + await fn(); + } finally { + this.emitClose(); } - } catch (error) { - squashError(error); - } finally { + }; + + const close = async () => { + // if no session has been defined on the cursor, the cursor was never initialized + // or the cursor was re-wound and never re-iterated. In either case, we + // 1. do not need to end the session (there is no session after all) + // 2. do not need to kill the cursor server-side + const session = this.cursorSession; + if (!session) return; + try { - if (this.cursorSession?.owner === this) { - await this.cursorSession.endSession({ error }); - } - if (!this.cursorSession?.inTransaction()) { - maybeClearPinnedConnection(this.cursorSession, { error }); + if ( + !this.isKilled && + this.cursorId && + !this.cursorId.isZero() && + this.cursorNamespace && + this.selectedServer && + !session.hasEnded + ) { + this.isKilled = true; + const cursorId = this.cursorId; + this.cursorId = Long.ZERO; + + await executeOperation( + this.cursorClient, + new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, { + session + }), + timeoutContextForKillCursors() + ); } + } catch (error) { + squashError(error); } finally { - this.emitClose(); + if (session.owner === this) { + await session.endSession({ error }); + } + if (!session.inTransaction()) { + maybeClearPinnedConnection(session, { error }); + } } - } + }; + + await withEmitClose(close); } /** @internal */ diff --git a/src/cursor/run_command_cursor.ts b/src/cursor/run_command_cursor.ts index d5b90eeda9d..0e9992a2aa4 100644 --- a/src/cursor/run_command_cursor.ts +++ b/src/cursor/run_command_cursor.ts @@ -1,7 +1,7 @@ import type { BSONSerializeOptions, Document } from '../bson'; import { CursorResponse } from '../cmap/wire_protocol/responses'; import type { Db } from '../db'; -import { MongoAPIError } from '../error'; +import { MongoAPIError, MongoRuntimeError } from '../error'; import { executeOperation } from '../operations/execute_operation'; import { GetMoreOperation } from '../operations/get_more'; import { RunCommandOperation } from '../operations/run_command'; @@ -161,6 +161,12 @@ export class RunCommandCursor extends AbstractCursor { /** @internal */ override async getMore(_batchSize: number): Promise { + if (!this.session) { + throw new MongoRuntimeError( + 'Unexpected null session. A cursor creating command should have set this' + ); + } + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, { ...this.cursorOptions, diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index ca35bdaef94..918b419a0c6 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1676,22 +1676,24 @@ describe('Cursor', function () { const collection = await client.db().collection('test'); const cursor = collection.find({}); + await cursor.next(); + const clonedCursor = cursor.clone(); - expect(cursor).to.have.property('session'); - expect(clonedCursor).to.have.property('session'); - expect(cursor.session).to.not.equal(clonedCursor.session); + expect(cursor).to.have.property('session').not.to.be.null; + expect(clonedCursor).to.have.property('session').to.be.null; }); it('removes session when cloning an aggregation cursor', async function () { const collection = await client.db().collection('test'); const cursor = collection.aggregate([{ $match: {} }]); + await cursor.next(); + const clonedCursor = cursor.clone(); - expect(cursor).to.have.property('session'); - expect(clonedCursor).to.have.property('session'); - expect(cursor.session).to.not.equal(clonedCursor.session); + expect(cursor).to.have.property('session').not.to.be.null; + expect(clonedCursor).to.have.property('session').to.be.null; }); it('destroying a stream stops it', async function () { @@ -3598,42 +3600,38 @@ describe('Cursor', function () { }); context('when executing on a find cursor', function () { - it('removes the existing session from the cloned cursor', function () { + it('removes the existing session from the cloned cursor', async function () { const docs = [{ name: 'test1' }, { name: 'test2' }]; - return collection.insertMany(docs).then(() => { - const cursor = collection.find({}, { batchSize: 1 }); - return cursor - .next() - .then(doc => { - expect(doc).to.exist; - const clonedCursor = cursor.clone(); - expect(clonedCursor.cursorOptions.session).to.not.exist; - expect(clonedCursor.session).to.have.property('_serverSession', null); // session is brand new and has not been used - }) - .finally(() => { - return cursor.close(); - }); - }); + await collection.insertMany(docs); + + const cursor = collection.find({}, { batchSize: 1 }); + try { + const doc = await cursor.next(); + expect(doc).to.exist; + + const clonedCursor = cursor.clone(); + expect(clonedCursor.session).to.be.null; + } finally { + await cursor.close(); + } }); }); context('when executing on an aggregation cursor', function () { - it('removes the existing session from the cloned cursor', function () { + it('removes the existing session from the cloned cursor', async function () { const docs = [{ name: 'test1' }, { name: 'test2' }]; - return collection.insertMany(docs).then(() => { - const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 }); - return cursor - .next() - .then(doc => { - expect(doc).to.exist; - const clonedCursor = cursor.clone(); - expect(clonedCursor.cursorOptions.session).to.not.exist; - expect(clonedCursor.session).to.have.property('_serverSession', null); // session is brand new and has not been used - }) - .finally(() => { - return cursor.close(); - }); - }); + await collection.insertMany(docs); + + const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 }); + try { + const doc = await cursor.next(); + expect(doc).to.exist; + + const clonedCursor = cursor.clone(); + expect(clonedCursor.session).to.be.null; + } finally { + await cursor.close(); + } }); }); }); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 2ca0459419e..4eb1b521dfe 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -23,6 +23,43 @@ import { clearFailPoint, configureFailPoint } from '../../tools/utils'; import { filterForCommands } from '../shared'; describe('class AbstractCursor', function () { + describe('lazy implicit session acquisition', function () { + let client: MongoClient; + let collection: Collection; + const docs = [{ count: 0 }, { count: 10 }]; + + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); + + await collection.insertMany(docs); + }); + + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); + + it('does not allocate a session when the cursor is constructed', function () { + const cursor = collection.find(); + expect(cursor.session).to.be.null; + }); + + it('allocates a session once the cursor is initialized', async function () { + const cursor = collection.find({}, { batchSize: 1 }); + await cursor.next(); + expect(cursor.session).not.to.be.null; + }); + + it('sets the session to `null` when rewound', async function () { + const cursor = collection.find({}, { batchSize: 1 }); + await cursor.next(); + cursor.rewind(); + expect(cursor.session).to.be.null; + }); + }); + describe('regression tests NODE-5372', function () { let client: MongoClient; let collection: Collection; diff --git a/test/unit/cursor/abstract_cursor.test.ts b/test/unit/cursor/abstract_cursor.test.ts index bb68a74318a..7665c8744f2 100644 --- a/test/unit/cursor/abstract_cursor.test.ts +++ b/test/unit/cursor/abstract_cursor.test.ts @@ -4,7 +4,7 @@ import { AbstractCursor, type AbstractCursorOptions, type Callback, - ClientSession, + type ClientSession, type ExecutionResult, MongoClient, ns, @@ -32,9 +32,9 @@ describe('class AbstractCursor', () => { }); context('#constructor', () => { - it('creates a session if none passed in', () => { + it('does not create a session if none passed in', () => { const cursor = new ConcreteCursor(client); - expect(cursor).to.have.property('session').that.is.instanceOf(ClientSession); + expect(cursor).to.have.property('session').that.is.null; }); it('uses the passed in session', async () => { diff --git a/test/unit/cursor/aggregation_cursor.test.ts b/test/unit/cursor/aggregation_cursor.test.ts index 82ae18745b0..e0d7a1223c1 100644 --- a/test/unit/cursor/aggregation_cursor.test.ts +++ b/test/unit/cursor/aggregation_cursor.test.ts @@ -29,10 +29,9 @@ describe('class AggregationCursor', () => { }); context('clone()', () => { - it('returns a new cursor with a different session', () => { + it('returns a new cursor', () => { const cloned = cursor.clone(); expect(cursor).to.not.equal(cloned); - expect(cursor.session).to.not.equal(cloned.session); }); });