@@ -64,6 +64,7 @@ import (
6464 "database/sql/driver"
6565 "errors"
6666 "fmt"
67+ "sync/atomic"
6768 "unsafe"
6869
6970 "github.com/apache/arrow-go/v18/arrow"
@@ -93,6 +94,8 @@ func NewArrowFromConn(driverConn driver.Conn) (*Arrow, error) {
9394
9495// arrowStreamReader implements array.RecordReader for streaming DuckDB results.
9596type arrowStreamReader struct {
97+ refCount int64
98+
9699 ctx context.Context
97100 stmt * Stmt
98101 res * C.duckdb_arrow
@@ -103,20 +106,29 @@ type arrowStreamReader struct {
103106 err error
104107}
105108
106- func (r * arrowStreamReader ) Retain () {}
109+ // Retain increases the reference count by 1.
110+ // Retain may be called simultaneously from multiple goroutines.
111+ func (r * arrowStreamReader ) Retain () {
112+ atomic .AddInt64 (& r .refCount , 1 )
113+ }
107114
115+ // Release decreases the reference count by 1.
116+ // When the reference count goes to zero, the memory is freed.
117+ // Release may be called simultaneously from multiple goroutines.
108118func (r * arrowStreamReader ) Release () {
109- if r .currentRec != nil {
110- r .currentRec .Release ()
111- r .currentRec = nil
112- }
113- if r .res != nil {
114- C .duckdb_destroy_arrow (r .res )
115- r .res = nil
116- }
117- if r .stmt != nil {
118- r .stmt .Close ()
119- r .stmt = nil
119+ if atomic .AddInt64 (& r .refCount , - 1 ) == 0 {
120+ if r .currentRec != nil {
121+ r .currentRec .Release ()
122+ r .currentRec = nil
123+ }
124+ if r .res != nil {
125+ C .duckdb_destroy_arrow (r .res )
126+ r .res = nil
127+ }
128+ if r .stmt != nil {
129+ r .stmt .Close ()
130+ r .stmt = nil
131+ }
120132 }
121133}
122134
@@ -207,6 +219,7 @@ func (a *Arrow) QueryContext(ctx context.Context, query string, args ...any) (ar
207219 }
208220
209221 return & arrowStreamReader {
222+ refCount : 1 ,
210223 ctx : ctx ,
211224 stmt : stmt ,
212225 res : res ,
0 commit comments