4545import org .slf4j .Logger ;
4646import org .slf4j .LoggerFactory ;
4747
48+ import com .dremio .common .exceptions .UserException ;
4849import com .dremio .common .exceptions .UserRemoteException ;
4950import com .dremio .common .utils .protos .ExternalIdHelper ;
5051import com .dremio .common .utils .protos .QueryWritableBatch ;
@@ -100,9 +101,11 @@ class Producer implements FlightProducer, AutoCloseable {
100101
101102 @ Override
102103 public void doAction (CallContext context , Action action , StreamListener <Result > resultStreamListener ) {
104+ throw Status .UNIMPLEMENTED .asRuntimeException ();
103105 }
104106
105107 private FlightInfo getInfo (CallContext callContext , FlightDescriptor descriptor , String sql ) {
108+ logger .info ("GetFlightInfo called for sql {}" , sql );
106109 return getInfoImpl (callContext , descriptor , sql );
107110 }
108111
@@ -116,8 +119,9 @@ private FlightInfo getInfoImpl(CallContext callContext, FlightDescriptor descrip
116119
117120 UserRequest request = new UserRequest (RpcType .CREATE_PREPARED_STATEMENT , req );
118121 Prepare prepare = new Prepare ();
119- submitWork (callContext , request , prepare );
120- return prepare .getInfo (descriptor );
122+
123+ UserBitShared .ExternalId externalId = submitWork (callContext , request , prepare );
124+ return prepare .getInfo (descriptor , externalId );
121125 } catch (Exception e ) {
122126 e .printStackTrace ();
123127 throw new RuntimeException (e );
@@ -126,13 +130,12 @@ private FlightInfo getInfoImpl(CallContext callContext, FlightDescriptor descrip
126130
127131 @ Override
128132 public FlightInfo getFlightInfo (CallContext callContext , FlightDescriptor descriptor ) {
129- logger .info ("called get flight info" );
130133 return getInfo (callContext , descriptor , new String (descriptor .getCommand ()));
131134 }
132135
133136 @ Override
134137 public Runnable acceptPut (CallContext callContext , FlightStream flightStream , StreamListener <org .apache .arrow .flight .PutResult > streamListener ) {
135- throw Status .UNAVAILABLE .asRuntimeException ();
138+ throw Status .UNIMPLEMENTED .asRuntimeException ();
136139 }
137140
138141 private UserBitShared .ExternalId submitWork (CallContext callContext , UserRequest request , UserResponseHandler handler ) {
@@ -143,6 +146,7 @@ private UserBitShared.ExternalId submitWork(CallContext callContext, UserRequest
143146 handler ,
144147 request ,
145148 TerminationListenerRegistry .NOOP );
149+ logger .debug ("Submitted job {} from flight for request with type {}" , ExternalIdHelper .toQueryId (externalId ), request .getType ());
146150 return externalId ;
147151 }
148152
@@ -158,28 +162,34 @@ private class Prepare implements UserResponseHandler {
158162 public Prepare () {
159163 }
160164
161- public FlightInfo getInfo (FlightDescriptor descriptor ) {
165+ public FlightInfo getInfo (FlightDescriptor descriptor , UserBitShared . ExternalId externalId ) {
162166 try {
167+ logger .debug ("Waiting for prepared statement handle to return for job id {}" , ExternalIdHelper .toQueryId (externalId ));
163168 CreatePreparedStatementResp handle = future .get ();
169+ logger .debug ("prepared statement handle for job id {} has returned" , ExternalIdHelper .toQueryId (externalId ));
164170 if (handle .getStatus () == RequestStatus .FAILED ) {
165- throw Status .UNKNOWN .withDescription (handle .getError ().getMessage ()).withCause (UserRemoteException .create (handle .getError ())).asRuntimeException ();
171+ logger .warn ("prepared statement handle for job id " + ExternalIdHelper .toQueryId (externalId ) + " has failed" , UserRemoteException .create (handle .getError ()));
172+ throw Status .INTERNAL .withDescription (handle .getError ().getMessage ()).withCause (UserRemoteException .create (handle .getError ())).asRuntimeException ();
166173 }
174+ logger .debug ("prepared statement handle for job id {} has succeeded" , ExternalIdHelper .toQueryId (externalId ));
167175 PreparedStatement statement = handle .getPreparedStatement ();
168176 Ticket ticket = new Ticket (statement .getServerHandle ().toByteArray ());
169177 FlightEndpoint endpoint = new FlightEndpoint (ticket , location );
170- FlightInfo info = new FlightInfo (fromMetadata (statement .getColumnsList ()), descriptor , Lists .newArrayList (endpoint ), -1L , -1L );
178+ logger .debug ("flight endpoint for job id {} has been created with ticket {}" , ExternalIdHelper .toQueryId (externalId ), new String (ticket .getBytes ()));
179+ Schema schema = fromMetadata (statement .getColumnsList ());
180+ FlightInfo info = new FlightInfo (schema , descriptor , Lists .newArrayList (endpoint ), -1L , -1L );
181+ logger .debug ("flight info for job id {} has been created with schema {}" , ExternalIdHelper .toQueryId (externalId ), schema .toJson ());
171182 return info ;
172- } catch (ExecutionException e ) {
173- throw new RuntimeException (e .getCause ());
174- } catch (InterruptedException e ) {
175- throw new RuntimeException (e );
183+ } catch (Exception e ) {
184+ logger .warn ("prepared statement handle for job id " + ExternalIdHelper .toQueryId (externalId ) + " has failed" , UserException .parseError (e ).buildSilently ());
185+ throw Status .UNKNOWN .withCause (UserException .parseError (e ).buildSilently ()).asRuntimeException ();
176186 }
177187 }
178188
179189 private Schema fromMetadata (List <ResultColumnMetadata > rcmd ) {
180190
181191 Schema schema = new Schema (rcmd .stream ().map (md -> {
182- ArrowType arrowType = SqlTypeNameToArrowType .toArrowType (md . getDataType () );
192+ ArrowType arrowType = SqlTypeNameToArrowType .toArrowType (md );
183193 FieldType fieldType = new FieldType (md .getIsNullable (), arrowType , null , null );
184194 return new Field (md .getColumnName (), fieldType , null );
185195 }).collect (Collectors .toList ()));
@@ -188,7 +198,7 @@ private Schema fromMetadata(List<ResultColumnMetadata> rcmd) {
188198
189199 @ Override
190200 public void sendData (RpcOutcomeListener <Ack > outcomeListener , QueryWritableBatch result ) {
191- throw new IllegalStateException ();
201+ throw Status . UNIMPLEMENTED . asRuntimeException ();
192202 }
193203
194204 @ Override
0 commit comments